Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * tablesync.c
3 : * PostgreSQL logical replication: initial table data synchronization
4 : *
5 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/tablesync.c
9 : *
10 : * NOTES
11 : * This file contains code for initial table data synchronization for
12 : * logical replication.
13 : *
14 : * The initial data synchronization is done separately for each table,
15 : * in a separate apply worker that only fetches the initial snapshot data
16 : * from the publisher and then synchronizes the position in the stream with
17 : * the leader apply worker.
18 : *
19 : * There are several reasons for doing the synchronization this way:
20 : * - It allows us to parallelize the initial data synchronization
21 : * which lowers the time needed for it to happen.
22 : * - The initial synchronization does not have to hold the xid and LSN
23 : * for the time it takes to copy data of all tables, causing less
24 : * bloat and lower disk consumption compared to doing the
25 : * synchronization in a single process for the whole database.
26 : * - It allows us to synchronize any tables added after the initial
27 : * synchronization has finished.
28 : *
29 : * The stream position synchronization works in multiple steps:
30 : * - Apply worker requests a tablesync worker to start, setting the new
31 : * table state to INIT.
32 : * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 : * copying.
34 : * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 : * worker specific) state to indicate when the copy phase has completed, so
36 : * if the worker crashes with this (non-memory) state then the copy will not
37 : * be re-attempted.
38 : * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 : * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 : * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 : * until either the table state is set to SYNCDONE or the sync worker
42 : * exits.
43 : * - After the sync worker has seen the state change to CATCHUP, it will
44 : * read the stream and apply changes (acting like an apply worker) until
45 : * it catches up to the specified stream position. Then it sets the
46 : * state to SYNCDONE. There might be zero changes applied between
47 : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 : * apply worker.
49 : * - Once the state is set to SYNCDONE, the apply will continue tracking
50 : * the table until it reaches the SYNCDONE stream position, at which
51 : * point it sets state to READY and stops tracking. Again, there might
52 : * be zero changes in between.
53 : *
54 : * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 : * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 : *
57 : * The catalog pg_subscription_rel is used to keep information about
58 : * subscribed tables and their state. The catalog holds all states
59 : * except SYNCWAIT and CATCHUP which are only in shared memory.
60 : *
61 : * Example flows look like this:
62 : * - Apply is in front:
63 : * sync:8
64 : * -> set in catalog FINISHEDCOPY
65 : * -> set in memory SYNCWAIT
66 : * apply:10
67 : * -> set in memory CATCHUP
68 : * -> enter wait-loop
69 : * sync:10
70 : * -> set in catalog SYNCDONE
71 : * -> exit
72 : * apply:10
73 : * -> exit wait-loop
74 : * -> continue rep
75 : * apply:11
76 : * -> set in catalog READY
77 : *
78 : * - Sync is in front:
79 : * sync:10
80 : * -> set in catalog FINISHEDCOPY
81 : * -> set in memory SYNCWAIT
82 : * apply:8
83 : * -> set in memory CATCHUP
84 : * -> continue per-table filtering
85 : * sync:10
86 : * -> set in catalog SYNCDONE
87 : * -> exit
88 : * apply:10
89 : * -> set in catalog READY
90 : * -> stop per-table filtering
91 : * -> continue rep
92 : *-------------------------------------------------------------------------
93 : */
94 :
95 : #include "postgres.h"
96 :
97 : #include "access/table.h"
98 : #include "access/xact.h"
99 : #include "catalog/indexing.h"
100 : #include "catalog/pg_subscription_rel.h"
101 : #include "catalog/pg_type.h"
102 : #include "commands/copy.h"
103 : #include "miscadmin.h"
104 : #include "nodes/makefuncs.h"
105 : #include "parser/parse_relation.h"
106 : #include "pgstat.h"
107 : #include "replication/logicallauncher.h"
108 : #include "replication/logicalrelation.h"
109 : #include "replication/logicalworker.h"
110 : #include "replication/origin.h"
111 : #include "replication/slot.h"
112 : #include "replication/walreceiver.h"
113 : #include "replication/worker_internal.h"
114 : #include "storage/ipc.h"
115 : #include "storage/lmgr.h"
116 : #include "utils/acl.h"
117 : #include "utils/array.h"
118 : #include "utils/builtins.h"
119 : #include "utils/lsyscache.h"
120 : #include "utils/memutils.h"
121 : #include "utils/rls.h"
122 : #include "utils/snapmgr.h"
123 : #include "utils/syscache.h"
124 : #include "utils/usercontext.h"
125 :
126 : typedef enum
127 : {
128 : SYNC_TABLE_STATE_NEEDS_REBUILD,
129 : SYNC_TABLE_STATE_REBUILD_STARTED,
130 : SYNC_TABLE_STATE_VALID,
131 : } SyncingTablesState;
132 :
133 : static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
134 : static List *table_states_not_ready = NIL;
135 : static bool FetchTableStates(bool *started_tx);
136 :
137 : static StringInfo copybuf = NULL;
138 :
139 : /*
140 : * Exit routine for synchronization worker.
141 : */
142 : pg_noreturn static void
143 366 : finish_sync_worker(void)
144 : {
145 : /*
146 : * Commit any outstanding transaction. This is the usual case, unless
147 : * there was nothing to do for the table.
148 : */
149 366 : if (IsTransactionState())
150 : {
151 366 : CommitTransactionCommand();
152 366 : pgstat_report_stat(true);
153 : }
154 :
155 : /* And flush all writes. */
156 366 : XLogFlush(GetXLogWriteRecPtr());
157 :
158 366 : StartTransactionCommand();
159 366 : ereport(LOG,
160 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
161 : MySubscription->name,
162 : get_rel_name(MyLogicalRepWorker->relid))));
163 366 : CommitTransactionCommand();
164 :
165 : /* Find the leader apply worker and signal it. */
166 366 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
167 :
168 : /* Stop gracefully */
169 366 : proc_exit(0);
170 : }
171 :
172 : /*
173 : * Wait until the relation sync state is set in the catalog to the expected
174 : * one; return true when it happens.
175 : *
176 : * Returns false if the table sync worker or the table itself have
177 : * disappeared, or the table state has been reset.
178 : *
179 : * Currently, this is used in the apply worker when transitioning from
180 : * CATCHUP state to SYNCDONE.
181 : */
182 : static bool
183 360 : wait_for_relation_state_change(Oid relid, char expected_state)
184 : {
185 : char state;
186 :
187 : for (;;)
188 404 : {
189 : LogicalRepWorker *worker;
190 : XLogRecPtr statelsn;
191 :
192 764 : CHECK_FOR_INTERRUPTS();
193 :
194 764 : InvalidateCatalogSnapshot();
195 764 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
196 : relid, &statelsn);
197 :
198 764 : if (state == SUBREL_STATE_UNKNOWN)
199 0 : break;
200 :
201 764 : if (state == expected_state)
202 360 : return true;
203 :
204 : /* Check if the sync worker is still running and bail if not. */
205 404 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
206 404 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
207 : false);
208 404 : LWLockRelease(LogicalRepWorkerLock);
209 404 : if (!worker)
210 0 : break;
211 :
212 404 : (void) WaitLatch(MyLatch,
213 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
214 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
215 :
216 404 : ResetLatch(MyLatch);
217 : }
218 :
219 0 : return false;
220 : }
221 :
222 : /*
223 : * Wait until the apply worker changes the state of our synchronization
224 : * worker to the expected one.
225 : *
226 : * Used when transitioning from SYNCWAIT state to CATCHUP.
227 : *
228 : * Returns false if the apply worker has disappeared.
229 : */
230 : static bool
231 366 : wait_for_worker_state_change(char expected_state)
232 : {
233 : int rc;
234 :
235 : for (;;)
236 366 : {
237 : LogicalRepWorker *worker;
238 :
239 732 : CHECK_FOR_INTERRUPTS();
240 :
241 : /*
242 : * Done if already in correct state. (We assume this fetch is atomic
243 : * enough to not give a misleading answer if we do it with no lock.)
244 : */
245 732 : if (MyLogicalRepWorker->relstate == expected_state)
246 366 : return true;
247 :
248 : /*
249 : * Bail out if the apply worker has died, else signal it we're
250 : * waiting.
251 : */
252 366 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
253 366 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
254 : InvalidOid, false);
255 366 : if (worker && worker->proc)
256 366 : logicalrep_worker_wakeup_ptr(worker);
257 366 : LWLockRelease(LogicalRepWorkerLock);
258 366 : if (!worker)
259 0 : break;
260 :
261 : /*
262 : * Wait. We expect to get a latch signal back from the apply worker,
263 : * but use a timeout in case it dies without sending one.
264 : */
265 366 : rc = WaitLatch(MyLatch,
266 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
267 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
268 :
269 366 : if (rc & WL_LATCH_SET)
270 366 : ResetLatch(MyLatch);
271 : }
272 :
273 0 : return false;
274 : }
275 :
276 : /*
277 : * Callback from syscache invalidation.
278 : */
279 : void
280 3388 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
281 : {
282 3388 : table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
283 3388 : }
284 :
285 : /*
286 : * Handle table synchronization cooperation from the synchronization
287 : * worker.
288 : *
289 : * If the sync worker is in CATCHUP state and reached (or passed) the
290 : * predetermined synchronization point in the WAL stream, mark the table as
291 : * SYNCDONE and finish.
292 : */
293 : static void
294 438 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
295 : {
296 438 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
297 :
298 438 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
299 438 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
300 : {
301 : TimeLineID tli;
302 366 : char syncslotname[NAMEDATALEN] = {0};
303 366 : char originname[NAMEDATALEN] = {0};
304 :
305 366 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
306 366 : MyLogicalRepWorker->relstate_lsn = current_lsn;
307 :
308 366 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
309 :
310 : /*
311 : * UpdateSubscriptionRelState must be called within a transaction.
312 : */
313 366 : if (!IsTransactionState())
314 366 : StartTransactionCommand();
315 :
316 366 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
317 366 : MyLogicalRepWorker->relid,
318 366 : MyLogicalRepWorker->relstate,
319 366 : MyLogicalRepWorker->relstate_lsn,
320 : false);
321 :
322 : /*
323 : * End streaming so that LogRepWorkerWalRcvConn can be used to drop
324 : * the slot.
325 : */
326 366 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
327 :
328 : /*
329 : * Cleanup the tablesync slot.
330 : *
331 : * This has to be done after updating the state because otherwise if
332 : * there is an error while doing the database operations we won't be
333 : * able to rollback dropped slot.
334 : */
335 366 : ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
336 366 : MyLogicalRepWorker->relid,
337 : syncslotname,
338 : sizeof(syncslotname));
339 :
340 : /*
341 : * It is important to give an error if we are unable to drop the slot,
342 : * otherwise, it won't be dropped till the corresponding subscription
343 : * is dropped. So passing missing_ok = false.
344 : */
345 366 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
346 :
347 366 : CommitTransactionCommand();
348 366 : pgstat_report_stat(false);
349 :
350 : /*
351 : * Start a new transaction to clean up the tablesync origin tracking.
352 : * This transaction will be ended within the finish_sync_worker().
353 : * Now, even, if we fail to remove this here, the apply worker will
354 : * ensure to clean it up afterward.
355 : *
356 : * We need to do this after the table state is set to SYNCDONE.
357 : * Otherwise, if an error occurs while performing the database
358 : * operation, the worker will be restarted and the in-memory state of
359 : * replication progress (remote_lsn) won't be rolled-back which would
360 : * have been cleared before restart. So, the restarted worker will use
361 : * invalid replication progress state resulting in replay of
362 : * transactions that have already been applied.
363 : */
364 366 : StartTransactionCommand();
365 :
366 366 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
367 366 : MyLogicalRepWorker->relid,
368 : originname,
369 : sizeof(originname));
370 :
371 : /*
372 : * Resetting the origin session removes the ownership of the slot.
373 : * This is needed to allow the origin to be dropped.
374 : */
375 366 : replorigin_session_reset();
376 366 : replorigin_session_origin = InvalidRepOriginId;
377 366 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
378 366 : replorigin_session_origin_timestamp = 0;
379 :
380 : /*
381 : * Drop the tablesync's origin tracking if exists.
382 : *
383 : * There is a chance that the user is concurrently performing refresh
384 : * for the subscription where we remove the table state and its origin
385 : * or the apply worker would have removed this origin. So passing
386 : * missing_ok = true.
387 : */
388 366 : replorigin_drop_by_name(originname, true, false);
389 :
390 366 : finish_sync_worker();
391 : }
392 : else
393 72 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
394 72 : }
395 :
396 : /*
397 : * Handle table synchronization cooperation from the apply worker.
398 : *
399 : * Walk over all subscription tables that are individually tracked by the
400 : * apply process (currently, all that have state other than
401 : * SUBREL_STATE_READY) and manage synchronization for them.
402 : *
403 : * If there are tables that need synchronizing and are not being synchronized
404 : * yet, start sync workers for them (if there are free slots for sync
405 : * workers). To prevent starting the sync worker for the same relation at a
406 : * high frequency after a failure, we store its last start time with each sync
407 : * state info. We start the sync worker for the same relation after waiting
408 : * at least wal_retrieve_retry_interval.
409 : *
410 : * For tables that are being synchronized already, check if sync workers
411 : * either need action from the apply worker or have finished. This is the
412 : * SYNCWAIT to CATCHUP transition.
413 : *
414 : * If the synchronization position is reached (SYNCDONE), then the table can
415 : * be marked as READY and is no longer tracked.
416 : */
417 : static void
418 9764 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
419 : {
420 : struct tablesync_start_time_mapping
421 : {
422 : Oid relid;
423 : TimestampTz last_start_time;
424 : };
425 : static HTAB *last_start_times = NULL;
426 : ListCell *lc;
427 9764 : bool started_tx = false;
428 9764 : bool should_exit = false;
429 9764 : Relation rel = NULL;
430 :
431 : Assert(!IsTransactionState());
432 :
433 : /* We need up-to-date sync state info for subscription tables here. */
434 9764 : FetchTableStates(&started_tx);
435 :
436 : /*
437 : * Prepare a hash table for tracking last start times of workers, to avoid
438 : * immediate restarts. We don't need it if there are no tables that need
439 : * syncing.
440 : */
441 9764 : if (table_states_not_ready != NIL && !last_start_times)
442 238 : {
443 : HASHCTL ctl;
444 :
445 238 : ctl.keysize = sizeof(Oid);
446 238 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
447 238 : last_start_times = hash_create("Logical replication table sync worker start times",
448 : 256, &ctl, HASH_ELEM | HASH_BLOBS);
449 : }
450 :
451 : /*
452 : * Clean up the hash table when we're done with all tables (just to
453 : * release the bit of memory).
454 : */
455 9526 : else if (table_states_not_ready == NIL && last_start_times)
456 : {
457 178 : hash_destroy(last_start_times);
458 178 : last_start_times = NULL;
459 : }
460 :
461 : /*
462 : * Process all tables that are being synchronized.
463 : */
464 13116 : foreach(lc, table_states_not_ready)
465 : {
466 3356 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
467 :
468 3356 : if (rstate->state == SUBREL_STATE_SYNCDONE)
469 : {
470 : /*
471 : * Apply has caught up to the position where the table sync has
472 : * finished. Mark the table as ready so that the apply will just
473 : * continue to replicate it normally.
474 : */
475 358 : if (current_lsn >= rstate->lsn)
476 : {
477 : char originname[NAMEDATALEN];
478 :
479 356 : rstate->state = SUBREL_STATE_READY;
480 356 : rstate->lsn = current_lsn;
481 356 : if (!started_tx)
482 : {
483 18 : StartTransactionCommand();
484 18 : started_tx = true;
485 : }
486 :
487 : /*
488 : * Remove the tablesync origin tracking if exists.
489 : *
490 : * There is a chance that the user is concurrently performing
491 : * refresh for the subscription where we remove the table
492 : * state and its origin or the tablesync worker would have
493 : * already removed this origin. We can't rely on tablesync
494 : * worker to remove the origin tracking as if there is any
495 : * error while dropping we won't restart it to drop the
496 : * origin. So passing missing_ok = true.
497 : *
498 : * Lock the subscription and origin in the same order as we
499 : * are doing during DDL commands to avoid deadlocks. See
500 : * AlterSubscription_refresh.
501 : */
502 356 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
503 : 0, AccessShareLock);
504 :
505 356 : if (!rel)
506 356 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
507 :
508 356 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
509 : rstate->relid,
510 : originname,
511 : sizeof(originname));
512 356 : replorigin_drop_by_name(originname, true, false);
513 :
514 : /*
515 : * Update the state to READY only after the origin cleanup.
516 : */
517 356 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
518 356 : rstate->relid, rstate->state,
519 : rstate->lsn, true);
520 : }
521 : }
522 : else
523 : {
524 : LogicalRepWorker *syncworker;
525 :
526 : /*
527 : * Look for a sync worker for this relation.
528 : */
529 2998 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
530 :
531 2998 : syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
532 : rstate->relid, false);
533 :
534 2998 : if (syncworker)
535 : {
536 : /* Found one, update our copy of its state */
537 1344 : SpinLockAcquire(&syncworker->relmutex);
538 1344 : rstate->state = syncworker->relstate;
539 1344 : rstate->lsn = syncworker->relstate_lsn;
540 1344 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
541 : {
542 : /*
543 : * Sync worker is waiting for apply. Tell sync worker it
544 : * can catchup now.
545 : */
546 360 : syncworker->relstate = SUBREL_STATE_CATCHUP;
547 360 : syncworker->relstate_lsn =
548 360 : Max(syncworker->relstate_lsn, current_lsn);
549 : }
550 1344 : SpinLockRelease(&syncworker->relmutex);
551 :
552 : /* If we told worker to catch up, wait for it. */
553 1344 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
554 : {
555 : /* Signal the sync worker, as it may be waiting for us. */
556 360 : if (syncworker->proc)
557 360 : logicalrep_worker_wakeup_ptr(syncworker);
558 :
559 : /* Now safe to release the LWLock */
560 360 : LWLockRelease(LogicalRepWorkerLock);
561 :
562 360 : if (started_tx)
563 : {
564 : /*
565 : * We must commit the existing transaction to release
566 : * the existing locks before entering a busy loop.
567 : * This is required to avoid any undetected deadlocks
568 : * due to any existing lock as deadlock detector won't
569 : * be able to detect the waits on the latch.
570 : *
571 : * Also close any tables prior to the commit.
572 : */
573 360 : if (rel)
574 : {
575 54 : table_close(rel, NoLock);
576 54 : rel = NULL;
577 : }
578 360 : CommitTransactionCommand();
579 360 : pgstat_report_stat(false);
580 : }
581 :
582 : /*
583 : * Enter busy loop and wait for synchronization worker to
584 : * reach expected state (or die trying).
585 : */
586 360 : StartTransactionCommand();
587 360 : started_tx = true;
588 :
589 360 : wait_for_relation_state_change(rstate->relid,
590 : SUBREL_STATE_SYNCDONE);
591 : }
592 : else
593 984 : LWLockRelease(LogicalRepWorkerLock);
594 : }
595 : else
596 : {
597 : /*
598 : * If there is no sync worker for this table yet, count
599 : * running sync workers for this subscription, while we have
600 : * the lock.
601 : */
602 : int nsyncworkers =
603 1654 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
604 :
605 : /* Now safe to release the LWLock */
606 1654 : LWLockRelease(LogicalRepWorkerLock);
607 :
608 : /*
609 : * If there are free sync worker slot(s), start a new sync
610 : * worker for the table.
611 : */
612 1654 : if (nsyncworkers < max_sync_workers_per_subscription)
613 : {
614 402 : TimestampTz now = GetCurrentTimestamp();
615 : struct tablesync_start_time_mapping *hentry;
616 : bool found;
617 :
618 402 : hentry = hash_search(last_start_times, &rstate->relid,
619 : HASH_ENTER, &found);
620 :
621 428 : if (!found ||
622 26 : TimestampDifferenceExceeds(hentry->last_start_time, now,
623 : wal_retrieve_retry_interval))
624 : {
625 : /*
626 : * Set the last_start_time even if we fail to start
627 : * the worker, so that we won't retry until
628 : * wal_retrieve_retry_interval has elapsed.
629 : */
630 384 : hentry->last_start_time = now;
631 384 : (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
632 384 : MyLogicalRepWorker->dbid,
633 384 : MySubscription->oid,
634 384 : MySubscription->name,
635 384 : MyLogicalRepWorker->userid,
636 : rstate->relid,
637 : DSM_HANDLE_INVALID,
638 : false);
639 : }
640 : }
641 : }
642 : }
643 : }
644 :
645 : /* Close table if opened */
646 9760 : if (rel)
647 302 : table_close(rel, NoLock);
648 :
649 :
650 9760 : if (started_tx)
651 : {
652 : /*
653 : * Even when the two_phase mode is requested by the user, it remains
654 : * as 'pending' until all tablesyncs have reached READY state.
655 : *
656 : * When this happens, we restart the apply worker and (if the
657 : * conditions are still ok) then the two_phase tri-state will become
658 : * 'enabled' at that time.
659 : *
660 : * Note: If the subscription has no tables then leave the state as
661 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
662 : * work.
663 : */
664 1696 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
665 : {
666 46 : CommandCounterIncrement(); /* make updates visible */
667 46 : if (AllTablesyncsReady())
668 : {
669 12 : ereport(LOG,
670 : (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
671 : MySubscription->name)));
672 12 : should_exit = true;
673 : }
674 : }
675 :
676 1696 : CommitTransactionCommand();
677 1696 : pgstat_report_stat(true);
678 : }
679 :
680 9760 : if (should_exit)
681 : {
682 : /*
683 : * Reset the last-start time for this worker so that the launcher will
684 : * restart it without waiting for wal_retrieve_retry_interval.
685 : */
686 12 : ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
687 :
688 12 : proc_exit(0);
689 : }
690 9748 : }
691 :
692 : /*
693 : * Process possible state change(s) of tables that are being synchronized.
694 : */
695 : void
696 10244 : process_syncing_tables(XLogRecPtr current_lsn)
697 : {
698 10244 : switch (MyLogicalRepWorker->type)
699 : {
700 42 : case WORKERTYPE_PARALLEL_APPLY:
701 :
702 : /*
703 : * Skip for parallel apply workers because they only operate on
704 : * tables that are in a READY state. See pa_can_start() and
705 : * should_apply_changes_for_rel().
706 : */
707 42 : break;
708 :
709 438 : case WORKERTYPE_TABLESYNC:
710 438 : process_syncing_tables_for_sync(current_lsn);
711 72 : break;
712 :
713 9764 : case WORKERTYPE_APPLY:
714 9764 : process_syncing_tables_for_apply(current_lsn);
715 9748 : break;
716 :
717 0 : case WORKERTYPE_UNKNOWN:
718 : /* Should never happen. */
719 0 : elog(ERROR, "Unknown worker type");
720 : }
721 9862 : }
722 :
723 : /*
724 : * Create list of columns for COPY based on logical relation mapping.
725 : */
726 : static List *
727 382 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
728 : {
729 382 : List *attnamelist = NIL;
730 : int i;
731 :
732 1026 : for (i = 0; i < rel->remoterel.natts; i++)
733 : {
734 644 : attnamelist = lappend(attnamelist,
735 644 : makeString(rel->remoterel.attnames[i]));
736 : }
737 :
738 :
739 382 : return attnamelist;
740 : }
741 :
742 : /*
743 : * Data source callback for the COPY FROM, which reads from the remote
744 : * connection and passes the data back to our local COPY.
745 : */
746 : static int
747 27964 : copy_read_data(void *outbuf, int minread, int maxread)
748 : {
749 27964 : int bytesread = 0;
750 : int avail;
751 :
752 : /* If there are some leftover data from previous read, use it. */
753 27964 : avail = copybuf->len - copybuf->cursor;
754 27964 : if (avail)
755 : {
756 0 : if (avail > maxread)
757 0 : avail = maxread;
758 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
759 0 : copybuf->cursor += avail;
760 0 : maxread -= avail;
761 0 : bytesread += avail;
762 : }
763 :
764 27994 : while (maxread > 0 && bytesread < minread)
765 : {
766 27994 : pgsocket fd = PGINVALID_SOCKET;
767 : int len;
768 27994 : char *buf = NULL;
769 :
770 : for (;;)
771 : {
772 : /* Try read the data. */
773 27994 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
774 :
775 27994 : CHECK_FOR_INTERRUPTS();
776 :
777 27994 : if (len == 0)
778 30 : break;
779 27964 : else if (len < 0)
780 27964 : return bytesread;
781 : else
782 : {
783 : /* Process the data */
784 27586 : copybuf->data = buf;
785 27586 : copybuf->len = len;
786 27586 : copybuf->cursor = 0;
787 :
788 27586 : avail = copybuf->len - copybuf->cursor;
789 27586 : if (avail > maxread)
790 0 : avail = maxread;
791 27586 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
792 27586 : outbuf = (char *) outbuf + avail;
793 27586 : copybuf->cursor += avail;
794 27586 : maxread -= avail;
795 27586 : bytesread += avail;
796 : }
797 :
798 27586 : if (maxread <= 0 || bytesread >= minread)
799 27586 : return bytesread;
800 : }
801 :
802 : /*
803 : * Wait for more data or latch.
804 : */
805 30 : (void) WaitLatchOrSocket(MyLatch,
806 : WL_SOCKET_READABLE | WL_LATCH_SET |
807 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
808 : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
809 :
810 30 : ResetLatch(MyLatch);
811 : }
812 :
813 0 : return bytesread;
814 : }
815 :
816 :
817 : /*
818 : * Get information about remote relation in similar fashion the RELATION
819 : * message provides during replication.
820 : *
821 : * This function also returns (a) the relation qualifications to be used in
822 : * the COPY command, and (b) whether the remote relation has published any
823 : * generated column.
824 : */
825 : static void
826 386 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
827 : List **qual, bool *gencol_published)
828 : {
829 : WalRcvExecResult *res;
830 : StringInfoData cmd;
831 : TupleTableSlot *slot;
832 386 : Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
833 386 : Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
834 386 : Oid qualRow[] = {TEXTOID};
835 : bool isnull;
836 : int natt;
837 386 : StringInfo pub_names = NULL;
838 386 : Bitmapset *included_cols = NULL;
839 386 : int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
840 :
841 386 : lrel->nspname = nspname;
842 386 : lrel->relname = relname;
843 :
844 : /* First fetch Oid and replica identity. */
845 386 : initStringInfo(&cmd);
846 386 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
847 : " FROM pg_catalog.pg_class c"
848 : " INNER JOIN pg_catalog.pg_namespace n"
849 : " ON (c.relnamespace = n.oid)"
850 : " WHERE n.nspname = %s"
851 : " AND c.relname = %s",
852 : quote_literal_cstr(nspname),
853 : quote_literal_cstr(relname));
854 386 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
855 : lengthof(tableRow), tableRow);
856 :
857 386 : if (res->status != WALRCV_OK_TUPLES)
858 0 : ereport(ERROR,
859 : (errcode(ERRCODE_CONNECTION_FAILURE),
860 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
861 : nspname, relname, res->err)));
862 :
863 386 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
864 386 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
865 0 : ereport(ERROR,
866 : (errcode(ERRCODE_UNDEFINED_OBJECT),
867 : errmsg("table \"%s.%s\" not found on publisher",
868 : nspname, relname)));
869 :
870 386 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
871 : Assert(!isnull);
872 386 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
873 : Assert(!isnull);
874 386 : lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
875 : Assert(!isnull);
876 :
877 386 : ExecDropSingleTupleTableSlot(slot);
878 386 : walrcv_clear_result(res);
879 :
880 :
881 : /*
882 : * Get column lists for each relation.
883 : *
884 : * We need to do this before fetching info about column names and types,
885 : * so that we can skip columns that should not be replicated.
886 : */
887 386 : if (server_version >= 150000)
888 : {
889 : WalRcvExecResult *pubres;
890 : TupleTableSlot *tslot;
891 386 : Oid attrsRow[] = {INT2VECTOROID};
892 :
893 : /* Build the pub_names comma-separated string. */
894 386 : pub_names = makeStringInfo();
895 386 : GetPublicationsStr(MySubscription->publications, pub_names, true);
896 :
897 : /*
898 : * Fetch info about column lists for the relation (from all the
899 : * publications).
900 : */
901 386 : resetStringInfo(&cmd);
902 386 : appendStringInfo(&cmd,
903 : "SELECT DISTINCT"
904 : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
905 : " THEN NULL ELSE gpt.attrs END)"
906 : " FROM pg_publication p,"
907 : " LATERAL pg_get_publication_tables(p.pubname) gpt,"
908 : " pg_class c"
909 : " WHERE gpt.relid = %u AND c.oid = gpt.relid"
910 : " AND p.pubname IN ( %s )",
911 : lrel->remoteid,
912 : pub_names->data);
913 :
914 386 : pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
915 : lengthof(attrsRow), attrsRow);
916 :
917 386 : if (pubres->status != WALRCV_OK_TUPLES)
918 0 : ereport(ERROR,
919 : (errcode(ERRCODE_CONNECTION_FAILURE),
920 : errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
921 : nspname, relname, pubres->err)));
922 :
923 : /*
924 : * We don't support the case where the column list is different for
925 : * the same table when combining publications. See comments atop
926 : * fetch_table_list. So there should be only one row returned.
927 : * Although we already checked this when creating the subscription, we
928 : * still need to check here in case the column list was changed after
929 : * creating the subscription and before the sync worker is started.
930 : */
931 386 : if (tuplestore_tuple_count(pubres->tuplestore) > 1)
932 0 : ereport(ERROR,
933 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
934 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
935 : nspname, relname));
936 :
937 : /*
938 : * Get the column list and build a single bitmap with the attnums.
939 : *
940 : * If we find a NULL value, it means all the columns should be
941 : * replicated.
942 : */
943 386 : tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
944 386 : if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
945 : {
946 386 : Datum cfval = slot_getattr(tslot, 1, &isnull);
947 :
948 386 : if (!isnull)
949 : {
950 : ArrayType *arr;
951 : int nelems;
952 : int16 *elems;
953 :
954 44 : arr = DatumGetArrayTypeP(cfval);
955 44 : nelems = ARR_DIMS(arr)[0];
956 44 : elems = (int16 *) ARR_DATA_PTR(arr);
957 :
958 118 : for (natt = 0; natt < nelems; natt++)
959 74 : included_cols = bms_add_member(included_cols, elems[natt]);
960 : }
961 :
962 386 : ExecClearTuple(tslot);
963 : }
964 386 : ExecDropSingleTupleTableSlot(tslot);
965 :
966 386 : walrcv_clear_result(pubres);
967 : }
968 :
969 : /*
970 : * Now fetch column names and types.
971 : */
972 386 : resetStringInfo(&cmd);
973 386 : appendStringInfoString(&cmd,
974 : "SELECT a.attnum,"
975 : " a.attname,"
976 : " a.atttypid,"
977 : " a.attnum = ANY(i.indkey)");
978 :
979 : /* Generated columns can be replicated since version 18. */
980 386 : if (server_version >= 180000)
981 386 : appendStringInfoString(&cmd, ", a.attgenerated != ''");
982 :
983 772 : appendStringInfo(&cmd,
984 : " FROM pg_catalog.pg_attribute a"
985 : " LEFT JOIN pg_catalog.pg_index i"
986 : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
987 : " WHERE a.attnum > 0::pg_catalog.int2"
988 : " AND NOT a.attisdropped %s"
989 : " AND a.attrelid = %u"
990 : " ORDER BY a.attnum",
991 : lrel->remoteid,
992 386 : (server_version >= 120000 && server_version < 180000 ?
993 : "AND a.attgenerated = ''" : ""),
994 : lrel->remoteid);
995 386 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
996 : server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
997 :
998 386 : if (res->status != WALRCV_OK_TUPLES)
999 0 : ereport(ERROR,
1000 : (errcode(ERRCODE_CONNECTION_FAILURE),
1001 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
1002 : nspname, relname, res->err)));
1003 :
1004 : /* We don't know the number of rows coming, so allocate enough space. */
1005 386 : lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
1006 386 : lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
1007 386 : lrel->attkeys = NULL;
1008 :
1009 : /*
1010 : * Store the columns as a list of names. Ignore those that are not
1011 : * present in the column list, if there is one.
1012 : */
1013 386 : natt = 0;
1014 386 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1015 1104 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1016 : {
1017 : char *rel_colname;
1018 : AttrNumber attnum;
1019 :
1020 718 : attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
1021 : Assert(!isnull);
1022 :
1023 : /* If the column is not in the column list, skip it. */
1024 718 : if (included_cols != NULL && !bms_is_member(attnum, included_cols))
1025 : {
1026 62 : ExecClearTuple(slot);
1027 62 : continue;
1028 : }
1029 :
1030 656 : rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1031 : Assert(!isnull);
1032 :
1033 656 : lrel->attnames[natt] = rel_colname;
1034 656 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
1035 : Assert(!isnull);
1036 :
1037 656 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
1038 214 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
1039 :
1040 : /* Remember if the remote table has published any generated column. */
1041 656 : if (server_version >= 180000 && !(*gencol_published))
1042 : {
1043 656 : *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
1044 : Assert(!isnull);
1045 : }
1046 :
1047 : /* Should never happen. */
1048 656 : if (++natt >= MaxTupleAttributeNumber)
1049 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
1050 : nspname, relname);
1051 :
1052 656 : ExecClearTuple(slot);
1053 : }
1054 386 : ExecDropSingleTupleTableSlot(slot);
1055 :
1056 386 : lrel->natts = natt;
1057 :
1058 386 : walrcv_clear_result(res);
1059 :
1060 : /*
1061 : * Get relation's row filter expressions. DISTINCT avoids the same
1062 : * expression of a table in multiple publications from being included
1063 : * multiple times in the final expression.
1064 : *
1065 : * We need to copy the row even if it matches just one of the
1066 : * publications, so we later combine all the quals with OR.
1067 : *
1068 : * For initial synchronization, row filtering can be ignored in following
1069 : * cases:
1070 : *
1071 : * 1) one of the subscribed publications for the table hasn't specified
1072 : * any row filter
1073 : *
1074 : * 2) one of the subscribed publications has puballtables set to true
1075 : *
1076 : * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1077 : * that includes this relation
1078 : */
1079 386 : if (server_version >= 150000)
1080 : {
1081 : /* Reuse the already-built pub_names. */
1082 : Assert(pub_names != NULL);
1083 :
1084 : /* Check for row filters. */
1085 386 : resetStringInfo(&cmd);
1086 386 : appendStringInfo(&cmd,
1087 : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1088 : " FROM pg_publication p,"
1089 : " LATERAL pg_get_publication_tables(p.pubname) gpt"
1090 : " WHERE gpt.relid = %u"
1091 : " AND p.pubname IN ( %s )",
1092 : lrel->remoteid,
1093 : pub_names->data);
1094 :
1095 386 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1096 :
1097 386 : if (res->status != WALRCV_OK_TUPLES)
1098 0 : ereport(ERROR,
1099 : (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1100 : nspname, relname, res->err)));
1101 :
1102 : /*
1103 : * Multiple row filter expressions for the same table will be combined
1104 : * by COPY using OR. If any of the filter expressions for this table
1105 : * are null, it means the whole table will be copied. In this case it
1106 : * is not necessary to construct a unified row filter expression at
1107 : * all.
1108 : */
1109 386 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1110 416 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1111 : {
1112 394 : Datum rf = slot_getattr(slot, 1, &isnull);
1113 :
1114 394 : if (!isnull)
1115 30 : *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1116 : else
1117 : {
1118 : /* Ignore filters and cleanup as necessary. */
1119 364 : if (*qual)
1120 : {
1121 6 : list_free_deep(*qual);
1122 6 : *qual = NIL;
1123 : }
1124 364 : break;
1125 : }
1126 :
1127 30 : ExecClearTuple(slot);
1128 : }
1129 386 : ExecDropSingleTupleTableSlot(slot);
1130 :
1131 386 : walrcv_clear_result(res);
1132 386 : destroyStringInfo(pub_names);
1133 : }
1134 :
1135 386 : pfree(cmd.data);
1136 386 : }
1137 :
1138 : /*
1139 : * Copy existing data of a table from publisher.
1140 : *
1141 : * Caller is responsible for locking the local relation.
1142 : */
1143 : static void
1144 386 : copy_table(Relation rel)
1145 : {
1146 : LogicalRepRelMapEntry *relmapentry;
1147 : LogicalRepRelation lrel;
1148 386 : List *qual = NIL;
1149 : WalRcvExecResult *res;
1150 : StringInfoData cmd;
1151 : CopyFromState cstate;
1152 : List *attnamelist;
1153 : ParseState *pstate;
1154 386 : List *options = NIL;
1155 386 : bool gencol_published = false;
1156 :
1157 : /* Get the publisher relation info. */
1158 386 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
1159 386 : RelationGetRelationName(rel), &lrel, &qual,
1160 : &gencol_published);
1161 :
1162 : /* Put the relation into relmap. */
1163 386 : logicalrep_relmap_update(&lrel);
1164 :
1165 : /* Map the publisher relation to local one. */
1166 386 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1167 : Assert(rel == relmapentry->localrel);
1168 :
1169 : /* Start copy on the publisher. */
1170 382 : initStringInfo(&cmd);
1171 :
1172 : /* Regular table with no row filter or generated columns */
1173 382 : if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
1174 : {
1175 326 : appendStringInfo(&cmd, "COPY %s",
1176 326 : quote_qualified_identifier(lrel.nspname, lrel.relname));
1177 :
1178 : /* If the table has columns, then specify the columns */
1179 326 : if (lrel.natts)
1180 : {
1181 324 : appendStringInfoString(&cmd, " (");
1182 :
1183 : /*
1184 : * XXX Do we need to list the columns in all cases? Maybe we're
1185 : * replicating all columns?
1186 : */
1187 884 : for (int i = 0; i < lrel.natts; i++)
1188 : {
1189 560 : if (i > 0)
1190 236 : appendStringInfoString(&cmd, ", ");
1191 :
1192 560 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1193 : }
1194 :
1195 324 : appendStringInfoChar(&cmd, ')');
1196 : }
1197 :
1198 326 : appendStringInfoString(&cmd, " TO STDOUT");
1199 : }
1200 : else
1201 : {
1202 : /*
1203 : * For non-tables and tables with row filters, we need to do COPY
1204 : * (SELECT ...), but we can't just do SELECT * because we may need to
1205 : * copy only subset of columns including generated columns. For tables
1206 : * with any row filters, build a SELECT query with OR'ed row filters
1207 : * for COPY.
1208 : *
1209 : * We also need to use this same COPY (SELECT ...) syntax when
1210 : * generated columns are published, because copy of generated columns
1211 : * is not supported by the normal COPY.
1212 : */
1213 56 : appendStringInfoString(&cmd, "COPY (SELECT ");
1214 140 : for (int i = 0; i < lrel.natts; i++)
1215 : {
1216 84 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1217 84 : if (i < lrel.natts - 1)
1218 28 : appendStringInfoString(&cmd, ", ");
1219 : }
1220 :
1221 56 : appendStringInfoString(&cmd, " FROM ");
1222 :
1223 : /*
1224 : * For regular tables, make sure we don't copy data from a child that
1225 : * inherits the named table as those will be copied separately.
1226 : */
1227 56 : if (lrel.relkind == RELKIND_RELATION)
1228 22 : appendStringInfoString(&cmd, "ONLY ");
1229 :
1230 56 : appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1231 : /* list of OR'ed filters */
1232 56 : if (qual != NIL)
1233 : {
1234 : ListCell *lc;
1235 22 : char *q = strVal(linitial(qual));
1236 :
1237 22 : appendStringInfo(&cmd, " WHERE %s", q);
1238 24 : for_each_from(lc, qual, 1)
1239 : {
1240 2 : q = strVal(lfirst(lc));
1241 2 : appendStringInfo(&cmd, " OR %s", q);
1242 : }
1243 22 : list_free_deep(qual);
1244 : }
1245 :
1246 56 : appendStringInfoString(&cmd, ") TO STDOUT");
1247 : }
1248 :
1249 : /*
1250 : * Prior to v16, initial table synchronization will use text format even
1251 : * if the binary option is enabled for a subscription.
1252 : */
1253 382 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1254 382 : MySubscription->binary)
1255 : {
1256 10 : appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1257 10 : options = list_make1(makeDefElem("format",
1258 : (Node *) makeString("binary"), -1));
1259 : }
1260 :
1261 382 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1262 382 : pfree(cmd.data);
1263 382 : if (res->status != WALRCV_OK_COPY_OUT)
1264 0 : ereport(ERROR,
1265 : (errcode(ERRCODE_CONNECTION_FAILURE),
1266 : errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1267 : lrel.nspname, lrel.relname, res->err)));
1268 382 : walrcv_clear_result(res);
1269 :
1270 382 : copybuf = makeStringInfo();
1271 :
1272 382 : pstate = make_parsestate(NULL);
1273 382 : (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1274 : NULL, false, false);
1275 :
1276 382 : attnamelist = make_copy_attnamelist(relmapentry);
1277 382 : cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1278 :
1279 : /* Do the copy */
1280 380 : (void) CopyFrom(cstate);
1281 :
1282 366 : logicalrep_rel_close(relmapentry, NoLock);
1283 366 : }
1284 :
1285 : /*
1286 : * Determine the tablesync slot name.
1287 : *
1288 : * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1289 : * on slot name length. We append system_identifier to avoid slot_name
1290 : * collision with subscriptions in other clusters. With the current scheme
1291 : * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1292 : * length of slot_name will be 50.
1293 : *
1294 : * The returned slot name is stored in the supplied buffer (syncslotname) with
1295 : * the given size.
1296 : *
1297 : * Note: We don't use the subscription slot name as part of tablesync slot name
1298 : * because we are responsible for cleaning up these slots and it could become
1299 : * impossible to recalculate what name to cleanup if the subscription slot name
1300 : * had changed.
1301 : */
1302 : void
1303 762 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1304 : char *syncslotname, Size szslot)
1305 : {
1306 762 : snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1307 : relid, GetSystemIdentifier());
1308 762 : }
1309 :
1310 : /*
1311 : * Start syncing the table in the sync worker.
1312 : *
1313 : * If nothing needs to be done to sync the table, we exit the worker without
1314 : * any further action.
1315 : *
1316 : * The returned slot name is palloc'ed in current memory context.
1317 : */
1318 : static char *
1319 386 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1320 : {
1321 : char *slotname;
1322 : char *err;
1323 : char relstate;
1324 : XLogRecPtr relstate_lsn;
1325 : Relation rel;
1326 : AclResult aclresult;
1327 : WalRcvExecResult *res;
1328 : char originname[NAMEDATALEN];
1329 : RepOriginId originid;
1330 : UserContext ucxt;
1331 : bool must_use_password;
1332 : bool run_as_owner;
1333 :
1334 : /* Check the state of the table synchronization. */
1335 386 : StartTransactionCommand();
1336 386 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1337 386 : MyLogicalRepWorker->relid,
1338 : &relstate_lsn);
1339 386 : CommitTransactionCommand();
1340 :
1341 : /* Is the use of a password mandatory? */
1342 764 : must_use_password = MySubscription->passwordrequired &&
1343 378 : !MySubscription->ownersuperuser;
1344 :
1345 386 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1346 386 : MyLogicalRepWorker->relstate = relstate;
1347 386 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1348 386 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1349 :
1350 : /*
1351 : * If synchronization is already done or no longer necessary, exit now
1352 : * that we've updated shared memory state.
1353 : */
1354 386 : switch (relstate)
1355 : {
1356 0 : case SUBREL_STATE_SYNCDONE:
1357 : case SUBREL_STATE_READY:
1358 : case SUBREL_STATE_UNKNOWN:
1359 0 : finish_sync_worker(); /* doesn't return */
1360 : }
1361 :
1362 : /* Calculate the name of the tablesync slot. */
1363 386 : slotname = (char *) palloc(NAMEDATALEN);
1364 386 : ReplicationSlotNameForTablesync(MySubscription->oid,
1365 386 : MyLogicalRepWorker->relid,
1366 : slotname,
1367 : NAMEDATALEN);
1368 :
1369 : /*
1370 : * Here we use the slot name instead of the subscription name as the
1371 : * application_name, so that it is different from the leader apply worker,
1372 : * so that synchronous replication can distinguish them.
1373 : */
1374 386 : LogRepWorkerWalRcvConn =
1375 386 : walrcv_connect(MySubscription->conninfo, true, true,
1376 : must_use_password,
1377 : slotname, &err);
1378 386 : if (LogRepWorkerWalRcvConn == NULL)
1379 0 : ereport(ERROR,
1380 : (errcode(ERRCODE_CONNECTION_FAILURE),
1381 : errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1382 : MySubscription->name, err)));
1383 :
1384 : Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1385 : MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1386 : MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1387 :
1388 : /* Assign the origin tracking record name. */
1389 386 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1390 386 : MyLogicalRepWorker->relid,
1391 : originname,
1392 : sizeof(originname));
1393 :
1394 386 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1395 : {
1396 : /*
1397 : * We have previously errored out before finishing the copy so the
1398 : * replication slot might exist. We want to remove the slot if it
1399 : * already exists and proceed.
1400 : *
1401 : * XXX We could also instead try to drop the slot, last time we failed
1402 : * but for that, we might need to clean up the copy state as it might
1403 : * be in the middle of fetching the rows. Also, if there is a network
1404 : * breakdown then it wouldn't have succeeded so trying it next time
1405 : * seems like a better bet.
1406 : */
1407 12 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1408 : }
1409 374 : else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1410 : {
1411 : /*
1412 : * The COPY phase was previously done, but tablesync then crashed
1413 : * before it was able to finish normally.
1414 : */
1415 0 : StartTransactionCommand();
1416 :
1417 : /*
1418 : * The origin tracking name must already exist. It was created first
1419 : * time this tablesync was launched.
1420 : */
1421 0 : originid = replorigin_by_name(originname, false);
1422 0 : replorigin_session_setup(originid, 0);
1423 0 : replorigin_session_origin = originid;
1424 0 : *origin_startpos = replorigin_session_get_progress(false);
1425 :
1426 0 : CommitTransactionCommand();
1427 :
1428 0 : goto copy_table_done;
1429 : }
1430 :
1431 386 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1432 386 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1433 386 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1434 386 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1435 :
1436 : /* Update the state and make it visible to others. */
1437 386 : StartTransactionCommand();
1438 386 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1439 386 : MyLogicalRepWorker->relid,
1440 386 : MyLogicalRepWorker->relstate,
1441 386 : MyLogicalRepWorker->relstate_lsn,
1442 : false);
1443 386 : CommitTransactionCommand();
1444 386 : pgstat_report_stat(true);
1445 :
1446 386 : StartTransactionCommand();
1447 :
1448 : /*
1449 : * Use a standard write lock here. It might be better to disallow access
1450 : * to the table while it's being synchronized. But we don't want to block
1451 : * the main apply process from working and it has to open the relation in
1452 : * RowExclusiveLock when remapping remote relation id to local one.
1453 : */
1454 386 : rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
1455 :
1456 : /*
1457 : * Start a transaction in the remote node in REPEATABLE READ mode. This
1458 : * ensures that both the replication slot we create (see below) and the
1459 : * COPY are consistent with each other.
1460 : */
1461 386 : res = walrcv_exec(LogRepWorkerWalRcvConn,
1462 : "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1463 : 0, NULL);
1464 386 : if (res->status != WALRCV_OK_COMMAND)
1465 0 : ereport(ERROR,
1466 : (errcode(ERRCODE_CONNECTION_FAILURE),
1467 : errmsg("table copy could not start transaction on publisher: %s",
1468 : res->err)));
1469 386 : walrcv_clear_result(res);
1470 :
1471 : /*
1472 : * Create a new permanent logical decoding slot. This slot will be used
1473 : * for the catchup phase after COPY is done, so tell it to use the
1474 : * snapshot to make the final data consistent.
1475 : */
1476 386 : walrcv_create_slot(LogRepWorkerWalRcvConn,
1477 : slotname, false /* permanent */ , false /* two_phase */ ,
1478 : MySubscription->failover,
1479 : CRS_USE_SNAPSHOT, origin_startpos);
1480 :
1481 : /*
1482 : * Setup replication origin tracking. The purpose of doing this before the
1483 : * copy is to avoid doing the copy again due to any error in setting up
1484 : * origin tracking.
1485 : */
1486 386 : originid = replorigin_by_name(originname, true);
1487 386 : if (!OidIsValid(originid))
1488 : {
1489 : /*
1490 : * Origin tracking does not exist, so create it now.
1491 : *
1492 : * Then advance to the LSN got from walrcv_create_slot. This is WAL
1493 : * logged for the purpose of recovery. Locks are to prevent the
1494 : * replication origin from vanishing while advancing.
1495 : */
1496 386 : originid = replorigin_create(originname);
1497 :
1498 386 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1499 386 : replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1500 : true /* go backward */ , true /* WAL log */ );
1501 386 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1502 :
1503 386 : replorigin_session_setup(originid, 0);
1504 386 : replorigin_session_origin = originid;
1505 : }
1506 : else
1507 : {
1508 0 : ereport(ERROR,
1509 : (errcode(ERRCODE_DUPLICATE_OBJECT),
1510 : errmsg("replication origin \"%s\" already exists",
1511 : originname)));
1512 : }
1513 :
1514 : /*
1515 : * Make sure that the copy command runs as the table owner, unless the
1516 : * user has opted out of that behaviour.
1517 : */
1518 386 : run_as_owner = MySubscription->runasowner;
1519 386 : if (!run_as_owner)
1520 384 : SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1521 :
1522 : /*
1523 : * Check that our table sync worker has permission to insert into the
1524 : * target table.
1525 : */
1526 386 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1527 : ACL_INSERT);
1528 386 : if (aclresult != ACLCHECK_OK)
1529 0 : aclcheck_error(aclresult,
1530 0 : get_relkind_objtype(rel->rd_rel->relkind),
1531 0 : RelationGetRelationName(rel));
1532 :
1533 : /*
1534 : * COPY FROM does not honor RLS policies. That is not a problem for
1535 : * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1536 : * who has it implicitly), but other roles should not be able to
1537 : * circumvent RLS. Disallow logical replication into RLS enabled
1538 : * relations for such roles.
1539 : */
1540 386 : if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
1541 0 : ereport(ERROR,
1542 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1543 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1544 : GetUserNameFromId(GetUserId(), true),
1545 : RelationGetRelationName(rel))));
1546 :
1547 : /* Now do the initial data copy */
1548 386 : PushActiveSnapshot(GetTransactionSnapshot());
1549 386 : copy_table(rel);
1550 366 : PopActiveSnapshot();
1551 :
1552 366 : res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1553 366 : if (res->status != WALRCV_OK_COMMAND)
1554 0 : ereport(ERROR,
1555 : (errcode(ERRCODE_CONNECTION_FAILURE),
1556 : errmsg("table copy could not finish transaction on publisher: %s",
1557 : res->err)));
1558 366 : walrcv_clear_result(res);
1559 :
1560 366 : if (!run_as_owner)
1561 364 : RestoreUserContext(&ucxt);
1562 :
1563 366 : table_close(rel, NoLock);
1564 :
1565 : /* Make the copy visible. */
1566 366 : CommandCounterIncrement();
1567 :
1568 : /*
1569 : * Update the persisted state to indicate the COPY phase is done; make it
1570 : * visible to others.
1571 : */
1572 366 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1573 366 : MyLogicalRepWorker->relid,
1574 : SUBREL_STATE_FINISHEDCOPY,
1575 366 : MyLogicalRepWorker->relstate_lsn,
1576 : false);
1577 :
1578 366 : CommitTransactionCommand();
1579 :
1580 366 : copy_table_done:
1581 :
1582 366 : elog(DEBUG1,
1583 : "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1584 : originname, LSN_FORMAT_ARGS(*origin_startpos));
1585 :
1586 : /*
1587 : * We are done with the initial data synchronization, update the state.
1588 : */
1589 366 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1590 366 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1591 366 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1592 366 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1593 :
1594 : /*
1595 : * Finally, wait until the leader apply worker tells us to catch up and
1596 : * then return to let LogicalRepApplyLoop do it.
1597 : */
1598 366 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1599 366 : return slotname;
1600 : }
1601 :
1602 : /*
1603 : * Common code to fetch the up-to-date sync state info into the static lists.
1604 : *
1605 : * Returns true if subscription has 1 or more tables, else false.
1606 : *
1607 : * Note: If this function started the transaction (indicated by the parameter)
1608 : * then it is the caller's responsibility to commit it.
1609 : */
1610 : static bool
1611 10022 : FetchTableStates(bool *started_tx)
1612 : {
1613 : static bool has_subrels = false;
1614 :
1615 10022 : *started_tx = false;
1616 :
1617 10022 : if (table_states_validity != SYNC_TABLE_STATE_VALID)
1618 : {
1619 : MemoryContext oldctx;
1620 : List *rstates;
1621 : ListCell *lc;
1622 : SubscriptionRelState *rstate;
1623 :
1624 1754 : table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
1625 :
1626 : /* Clean the old lists. */
1627 1754 : list_free_deep(table_states_not_ready);
1628 1754 : table_states_not_ready = NIL;
1629 :
1630 1754 : if (!IsTransactionState())
1631 : {
1632 1718 : StartTransactionCommand();
1633 1718 : *started_tx = true;
1634 : }
1635 :
1636 : /* Fetch all non-ready tables. */
1637 1754 : rstates = GetSubscriptionRelations(MySubscription->oid, true);
1638 :
1639 : /* Allocate the tracking info in a permanent memory context. */
1640 1754 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1641 4664 : foreach(lc, rstates)
1642 : {
1643 2910 : rstate = palloc(sizeof(SubscriptionRelState));
1644 2910 : memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1645 2910 : table_states_not_ready = lappend(table_states_not_ready, rstate);
1646 : }
1647 1754 : MemoryContextSwitchTo(oldctx);
1648 :
1649 : /*
1650 : * Does the subscription have tables?
1651 : *
1652 : * If there were not-READY relations found then we know it does. But
1653 : * if table_states_not_ready was empty we still need to check again to
1654 : * see if there are 0 tables.
1655 : */
1656 2262 : has_subrels = (table_states_not_ready != NIL) ||
1657 508 : HasSubscriptionRelations(MySubscription->oid);
1658 :
1659 : /*
1660 : * If the subscription relation cache has been invalidated since we
1661 : * entered this routine, we still use and return the relations we just
1662 : * finished constructing, to avoid infinite loops, but we leave the
1663 : * table states marked as stale so that we'll rebuild it again on next
1664 : * access. Otherwise, we mark the table states as valid.
1665 : */
1666 1754 : if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
1667 1754 : table_states_validity = SYNC_TABLE_STATE_VALID;
1668 : }
1669 :
1670 10022 : return has_subrels;
1671 : }
1672 :
1673 : /*
1674 : * Execute the initial sync with error handling. Disable the subscription,
1675 : * if it's required.
1676 : *
1677 : * Allocate the slot name in long-lived context on return. Note that we don't
1678 : * handle FATAL errors which are probably because of system resource error and
1679 : * are not repeatable.
1680 : */
1681 : static void
1682 386 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1683 : {
1684 386 : char *sync_slotname = NULL;
1685 :
1686 : Assert(am_tablesync_worker());
1687 :
1688 386 : PG_TRY();
1689 : {
1690 : /* Call initial sync. */
1691 386 : sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1692 : }
1693 20 : PG_CATCH();
1694 : {
1695 20 : if (MySubscription->disableonerr)
1696 2 : DisableSubscriptionAndExit();
1697 : else
1698 : {
1699 : /*
1700 : * Report the worker failed during table synchronization. Abort
1701 : * the current transaction so that the stats message is sent in an
1702 : * idle state.
1703 : */
1704 18 : AbortOutOfAnyTransaction();
1705 18 : pgstat_report_subscription_error(MySubscription->oid, false);
1706 :
1707 18 : PG_RE_THROW();
1708 : }
1709 : }
1710 366 : PG_END_TRY();
1711 :
1712 : /* allocate slot name in long-lived context */
1713 366 : *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1714 366 : pfree(sync_slotname);
1715 366 : }
1716 :
1717 : /*
1718 : * Runs the tablesync worker.
1719 : *
1720 : * It starts syncing tables. After a successful sync, sets streaming options
1721 : * and starts streaming to catchup with apply worker.
1722 : */
1723 : static void
1724 386 : run_tablesync_worker()
1725 : {
1726 : char originname[NAMEDATALEN];
1727 386 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1728 386 : char *slotname = NULL;
1729 : WalRcvStreamOptions options;
1730 :
1731 386 : start_table_sync(&origin_startpos, &slotname);
1732 :
1733 366 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1734 366 : MyLogicalRepWorker->relid,
1735 : originname,
1736 : sizeof(originname));
1737 :
1738 366 : set_apply_error_context_origin(originname);
1739 :
1740 366 : set_stream_options(&options, slotname, &origin_startpos);
1741 :
1742 366 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1743 :
1744 : /* Apply the changes till we catchup with the apply worker. */
1745 366 : start_apply(origin_startpos);
1746 0 : }
1747 :
1748 : /* Logical Replication Tablesync worker entry point */
1749 : void
1750 390 : TablesyncWorkerMain(Datum main_arg)
1751 : {
1752 390 : int worker_slot = DatumGetInt32(main_arg);
1753 :
1754 390 : SetupApplyOrSyncWorker(worker_slot);
1755 :
1756 386 : run_tablesync_worker();
1757 :
1758 0 : finish_sync_worker();
1759 : }
1760 :
1761 : /*
1762 : * If the subscription has no tables then return false.
1763 : *
1764 : * Otherwise, are all tablesyncs READY?
1765 : *
1766 : * Note: This function is not suitable to be called from outside of apply or
1767 : * tablesync workers because MySubscription needs to be already initialized.
1768 : */
1769 : bool
1770 258 : AllTablesyncsReady(void)
1771 : {
1772 258 : bool started_tx = false;
1773 258 : bool has_subrels = false;
1774 :
1775 : /* We need up-to-date sync state info for subscription tables here. */
1776 258 : has_subrels = FetchTableStates(&started_tx);
1777 :
1778 258 : if (started_tx)
1779 : {
1780 36 : CommitTransactionCommand();
1781 36 : pgstat_report_stat(true);
1782 : }
1783 :
1784 : /*
1785 : * Return false when there are no tables in subscription or not all tables
1786 : * are in ready state; true otherwise.
1787 : */
1788 258 : return has_subrels && (table_states_not_ready == NIL);
1789 : }
1790 :
1791 : /*
1792 : * Update the two_phase state of the specified subscription in pg_subscription.
1793 : */
1794 : void
1795 20 : UpdateTwoPhaseState(Oid suboid, char new_state)
1796 : {
1797 : Relation rel;
1798 : HeapTuple tup;
1799 : bool nulls[Natts_pg_subscription];
1800 : bool replaces[Natts_pg_subscription];
1801 : Datum values[Natts_pg_subscription];
1802 :
1803 : Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1804 : new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1805 : new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1806 :
1807 20 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1808 20 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1809 20 : if (!HeapTupleIsValid(tup))
1810 0 : elog(ERROR,
1811 : "cache lookup failed for subscription oid %u",
1812 : suboid);
1813 :
1814 : /* Form a new tuple. */
1815 20 : memset(values, 0, sizeof(values));
1816 20 : memset(nulls, false, sizeof(nulls));
1817 20 : memset(replaces, false, sizeof(replaces));
1818 :
1819 : /* And update/set two_phase state */
1820 20 : values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1821 20 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1822 :
1823 20 : tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1824 : values, nulls, replaces);
1825 20 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1826 :
1827 20 : heap_freetuple(tup);
1828 20 : table_close(rel, RowExclusiveLock);
1829 20 : }
|