Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * tablesync.c
3 : * PostgreSQL logical replication: initial table data synchronization
4 : *
5 : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/tablesync.c
9 : *
10 : * NOTES
11 : * This file contains code for initial table data synchronization for
12 : * logical replication.
13 : *
14 : * The initial data synchronization is done separately for each table,
15 : * in a separate apply worker that only fetches the initial snapshot data
16 : * from the publisher and then synchronizes the position in the stream with
17 : * the leader apply worker.
18 : *
19 : * There are several reasons for doing the synchronization this way:
20 : * - It allows us to parallelize the initial data synchronization
21 : * which lowers the time needed for it to happen.
22 : * - The initial synchronization does not have to hold the xid and LSN
23 : * for the time it takes to copy data of all tables, causing less
24 : * bloat and lower disk consumption compared to doing the
25 : * synchronization in a single process for the whole database.
26 : * - It allows us to synchronize any tables added after the initial
27 : * synchronization has finished.
28 : *
29 : * The stream position synchronization works in multiple steps:
30 : * - Apply worker requests a tablesync worker to start, setting the new
31 : * table state to INIT.
32 : * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 : * copying.
34 : * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 : * worker specific) state to indicate when the copy phase has completed, so
36 : * if the worker crashes with this (non-memory) state then the copy will not
37 : * be re-attempted.
38 : * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 : * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 : * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 : * until either the table state is set to SYNCDONE or the sync worker
42 : * exits.
43 : * - After the sync worker has seen the state change to CATCHUP, it will
44 : * read the stream and apply changes (acting like an apply worker) until
45 : * it catches up to the specified stream position. Then it sets the
46 : * state to SYNCDONE. There might be zero changes applied between
47 : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 : * apply worker.
49 : * - Once the state is set to SYNCDONE, the apply will continue tracking
50 : * the table until it reaches the SYNCDONE stream position, at which
51 : * point it sets state to READY and stops tracking. Again, there might
52 : * be zero changes in between.
53 : *
54 : * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 : * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 : *
57 : * The catalog pg_subscription_rel is used to keep information about
58 : * subscribed tables and their state. The catalog holds all states
59 : * except SYNCWAIT and CATCHUP which are only in shared memory.
60 : *
61 : * Example flows look like this:
62 : * - Apply is in front:
63 : * sync:8
64 : * -> set in catalog FINISHEDCOPY
65 : * -> set in memory SYNCWAIT
66 : * apply:10
67 : * -> set in memory CATCHUP
68 : * -> enter wait-loop
69 : * sync:10
70 : * -> set in catalog SYNCDONE
71 : * -> exit
72 : * apply:10
73 : * -> exit wait-loop
74 : * -> continue rep
75 : * apply:11
76 : * -> set in catalog READY
77 : *
78 : * - Sync is in front:
79 : * sync:10
80 : * -> set in catalog FINISHEDCOPY
81 : * -> set in memory SYNCWAIT
82 : * apply:8
83 : * -> set in memory CATCHUP
84 : * -> continue per-table filtering
85 : * sync:10
86 : * -> set in catalog SYNCDONE
87 : * -> exit
88 : * apply:10
89 : * -> set in catalog READY
90 : * -> stop per-table filtering
91 : * -> continue rep
92 : *-------------------------------------------------------------------------
93 : */
94 :
95 : #include "postgres.h"
96 :
97 : #include "access/table.h"
98 : #include "access/xact.h"
99 : #include "catalog/indexing.h"
100 : #include "catalog/pg_subscription_rel.h"
101 : #include "catalog/pg_type.h"
102 : #include "commands/copy.h"
103 : #include "miscadmin.h"
104 : #include "nodes/makefuncs.h"
105 : #include "parser/parse_relation.h"
106 : #include "pgstat.h"
107 : #include "replication/logicallauncher.h"
108 : #include "replication/logicalrelation.h"
109 : #include "replication/logicalworker.h"
110 : #include "replication/origin.h"
111 : #include "replication/slot.h"
112 : #include "replication/walreceiver.h"
113 : #include "replication/worker_internal.h"
114 : #include "storage/ipc.h"
115 : #include "storage/latch.h"
116 : #include "storage/lmgr.h"
117 : #include "utils/acl.h"
118 : #include "utils/array.h"
119 : #include "utils/builtins.h"
120 : #include "utils/lsyscache.h"
121 : #include "utils/rls.h"
122 : #include "utils/snapmgr.h"
123 : #include "utils/syscache.h"
124 : #include "utils/usercontext.h"
125 :
126 : List *table_states_not_ready = NIL;
127 :
128 : static StringInfo copybuf = NULL;
129 :
130 : /*
131 : * Wait until the relation sync state is set in the catalog to the expected
132 : * one; return true when it happens.
133 : *
134 : * Returns false if the table sync worker or the table itself have
135 : * disappeared, or the table state has been reset.
136 : *
137 : * Currently, this is used in the apply worker when transitioning from
138 : * CATCHUP state to SYNCDONE.
139 : */
140 : static bool
141 190 : wait_for_table_state_change(Oid relid, char expected_state)
142 : {
143 : char state;
144 :
145 : for (;;)
146 218 : {
147 : LogicalRepWorker *worker;
148 : XLogRecPtr statelsn;
149 :
150 408 : CHECK_FOR_INTERRUPTS();
151 :
152 408 : InvalidateCatalogSnapshot();
153 408 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
154 : relid, &statelsn);
155 :
156 408 : if (state == SUBREL_STATE_UNKNOWN)
157 0 : break;
158 :
159 408 : if (state == expected_state)
160 190 : return true;
161 :
162 : /* Check if the sync worker is still running and bail if not. */
163 218 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
164 218 : worker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
165 218 : MyLogicalRepWorker->subid, relid,
166 : false);
167 218 : LWLockRelease(LogicalRepWorkerLock);
168 218 : if (!worker)
169 0 : break;
170 :
171 218 : (void) WaitLatch(MyLatch,
172 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
173 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
174 :
175 218 : ResetLatch(MyLatch);
176 : }
177 :
178 0 : return false;
179 : }
180 :
181 : /*
182 : * Wait until the apply worker changes the state of our synchronization
183 : * worker to the expected one.
184 : *
185 : * Used when transitioning from SYNCWAIT state to CATCHUP.
186 : *
187 : * Returns false if the apply worker has disappeared.
188 : */
189 : static bool
190 192 : wait_for_worker_state_change(char expected_state)
191 : {
192 : int rc;
193 :
194 : for (;;)
195 193 : {
196 : LogicalRepWorker *worker;
197 :
198 385 : CHECK_FOR_INTERRUPTS();
199 :
200 : /*
201 : * Done if already in correct state. (We assume this fetch is atomic
202 : * enough to not give a misleading answer if we do it with no lock.)
203 : */
204 385 : if (MyLogicalRepWorker->relstate == expected_state)
205 192 : return true;
206 :
207 : /*
208 : * Bail out if the apply worker has died, else signal it we're
209 : * waiting.
210 : */
211 193 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
212 193 : worker = logicalrep_worker_find(WORKERTYPE_APPLY,
213 193 : MyLogicalRepWorker->subid, InvalidOid,
214 : false);
215 193 : if (worker && worker->proc)
216 193 : logicalrep_worker_wakeup_ptr(worker);
217 193 : LWLockRelease(LogicalRepWorkerLock);
218 193 : if (!worker)
219 0 : break;
220 :
221 : /*
222 : * Wait. We expect to get a latch signal back from the apply worker,
223 : * but use a timeout in case it dies without sending one.
224 : */
225 193 : rc = WaitLatch(MyLatch,
226 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
227 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
228 :
229 193 : if (rc & WL_LATCH_SET)
230 193 : ResetLatch(MyLatch);
231 : }
232 :
233 0 : return false;
234 : }
235 :
236 : /*
237 : * Handle table synchronization cooperation from the synchronization
238 : * worker.
239 : *
240 : * If the sync worker is in CATCHUP state and reached (or passed) the
241 : * predetermined synchronization point in the WAL stream, mark the table as
242 : * SYNCDONE and finish.
243 : */
244 : void
245 224 : ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
246 : {
247 224 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
248 :
249 224 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
250 224 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
251 : {
252 : TimeLineID tli;
253 192 : char syncslotname[NAMEDATALEN] = {0};
254 192 : char originname[NAMEDATALEN] = {0};
255 :
256 192 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
257 192 : MyLogicalRepWorker->relstate_lsn = current_lsn;
258 :
259 192 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
260 :
261 : /*
262 : * UpdateSubscriptionRelState must be called within a transaction.
263 : */
264 192 : if (!IsTransactionState())
265 192 : StartTransactionCommand();
266 :
267 192 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
268 192 : MyLogicalRepWorker->relid,
269 192 : MyLogicalRepWorker->relstate,
270 192 : MyLogicalRepWorker->relstate_lsn,
271 : false);
272 :
273 : /*
274 : * End streaming so that LogRepWorkerWalRcvConn can be used to drop
275 : * the slot.
276 : */
277 192 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
278 :
279 : /*
280 : * Cleanup the tablesync slot.
281 : *
282 : * This has to be done after updating the state because otherwise if
283 : * there is an error while doing the database operations we won't be
284 : * able to rollback dropped slot.
285 : */
286 192 : ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
287 192 : MyLogicalRepWorker->relid,
288 : syncslotname,
289 : sizeof(syncslotname));
290 :
291 : /*
292 : * It is important to give an error if we are unable to drop the slot,
293 : * otherwise, it won't be dropped till the corresponding subscription
294 : * is dropped. So passing missing_ok = false.
295 : */
296 192 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
297 :
298 192 : CommitTransactionCommand();
299 192 : pgstat_report_stat(false);
300 :
301 : /*
302 : * Start a new transaction to clean up the tablesync origin tracking.
303 : * This transaction will be ended within the FinishSyncWorker(). Now,
304 : * even, if we fail to remove this here, the apply worker will ensure
305 : * to clean it up afterward.
306 : *
307 : * We need to do this after the table state is set to SYNCDONE.
308 : * Otherwise, if an error occurs while performing the database
309 : * operation, the worker will be restarted and the in-memory state of
310 : * replication progress (remote_lsn) won't be rolled-back which would
311 : * have been cleared before restart. So, the restarted worker will use
312 : * invalid replication progress state resulting in replay of
313 : * transactions that have already been applied.
314 : */
315 192 : StartTransactionCommand();
316 :
317 192 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
318 192 : MyLogicalRepWorker->relid,
319 : originname,
320 : sizeof(originname));
321 :
322 : /*
323 : * Resetting the origin session removes the ownership of the slot.
324 : * This is needed to allow the origin to be dropped.
325 : */
326 192 : replorigin_session_reset();
327 192 : replorigin_xact_clear(true);
328 :
329 : /*
330 : * Drop the tablesync's origin tracking if exists.
331 : *
332 : * There is a chance that the user is concurrently performing refresh
333 : * for the subscription where we remove the table state and its origin
334 : * or the apply worker would have removed this origin. So passing
335 : * missing_ok = true.
336 : */
337 192 : replorigin_drop_by_name(originname, true, false);
338 :
339 192 : FinishSyncWorker();
340 : }
341 : else
342 32 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
343 32 : }
344 :
345 : /*
346 : * Handle table synchronization cooperation from the apply worker.
347 : *
348 : * Walk over all subscription tables that are individually tracked by the
349 : * apply process (currently, all that have state other than
350 : * SUBREL_STATE_READY) and manage synchronization for them.
351 : *
352 : * If there are tables that need synchronizing and are not being synchronized
353 : * yet, start sync workers for them (if there are free slots for sync
354 : * workers). To prevent starting the sync worker for the same relation at a
355 : * high frequency after a failure, we store its last start time with each sync
356 : * state info. We start the sync worker for the same relation after waiting
357 : * at least wal_retrieve_retry_interval.
358 : *
359 : * For tables that are being synchronized already, check if sync workers
360 : * either need action from the apply worker or have finished. This is the
361 : * SYNCWAIT to CATCHUP transition.
362 : *
363 : * If the synchronization position is reached (SYNCDONE), then the table can
364 : * be marked as READY and is no longer tracked.
365 : */
366 : void
367 6700 : ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
368 : {
369 : struct tablesync_start_time_mapping
370 : {
371 : Oid relid;
372 : TimestampTz last_start_time;
373 : };
374 : static HTAB *last_start_times = NULL;
375 : ListCell *lc;
376 : bool started_tx;
377 6700 : bool should_exit = false;
378 6700 : Relation rel = NULL;
379 :
380 : Assert(!IsTransactionState());
381 :
382 : /* We need up-to-date sync state info for subscription tables here. */
383 6700 : FetchRelationStates(NULL, NULL, &started_tx);
384 :
385 : /*
386 : * Prepare a hash table for tracking last start times of workers, to avoid
387 : * immediate restarts. We don't need it if there are no tables that need
388 : * syncing.
389 : */
390 6700 : if (table_states_not_ready != NIL && !last_start_times)
391 126 : {
392 : HASHCTL ctl;
393 :
394 126 : ctl.keysize = sizeof(Oid);
395 126 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
396 126 : last_start_times = hash_create("Logical replication table sync worker start times",
397 : 256, &ctl, HASH_ELEM | HASH_BLOBS);
398 : }
399 :
400 : /*
401 : * Clean up the hash table when we're done with all tables (just to
402 : * release the bit of memory).
403 : */
404 6574 : else if (table_states_not_ready == NIL && last_start_times)
405 : {
406 98 : hash_destroy(last_start_times);
407 98 : last_start_times = NULL;
408 : }
409 :
410 : /*
411 : * Process all tables that are being synchronized.
412 : */
413 8449 : foreach(lc, table_states_not_ready)
414 : {
415 1749 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
416 :
417 1749 : if (!started_tx)
418 : {
419 318 : StartTransactionCommand();
420 318 : started_tx = true;
421 : }
422 :
423 : Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
424 :
425 1749 : if (rstate->state == SUBREL_STATE_SYNCDONE)
426 : {
427 : /*
428 : * Apply has caught up to the position where the table sync has
429 : * finished. Mark the table as ready so that the apply will just
430 : * continue to replicate it normally.
431 : */
432 190 : if (current_lsn >= rstate->lsn)
433 : {
434 : char originname[NAMEDATALEN];
435 :
436 190 : rstate->state = SUBREL_STATE_READY;
437 190 : rstate->lsn = current_lsn;
438 :
439 : /*
440 : * Remove the tablesync origin tracking if exists.
441 : *
442 : * There is a chance that the user is concurrently performing
443 : * refresh for the subscription where we remove the table
444 : * state and its origin or the tablesync worker would have
445 : * already removed this origin. We can't rely on tablesync
446 : * worker to remove the origin tracking as if there is any
447 : * error while dropping we won't restart it to drop the
448 : * origin. So passing missing_ok = true.
449 : *
450 : * Lock the subscription and origin in the same order as we
451 : * are doing during DDL commands to avoid deadlocks. See
452 : * AlterSubscription_refresh.
453 : */
454 190 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
455 : 0, AccessShareLock);
456 :
457 190 : if (!rel)
458 188 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
459 :
460 190 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
461 : rstate->relid,
462 : originname,
463 : sizeof(originname));
464 190 : replorigin_drop_by_name(originname, true, false);
465 :
466 : /*
467 : * Update the state to READY only after the origin cleanup.
468 : */
469 190 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
470 190 : rstate->relid, rstate->state,
471 : rstate->lsn, true);
472 : }
473 : }
474 : else
475 : {
476 : LogicalRepWorker *syncworker;
477 :
478 : /*
479 : * Look for a sync worker for this relation.
480 : */
481 1559 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
482 :
483 1559 : syncworker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
484 1559 : MyLogicalRepWorker->subid,
485 : rstate->relid, false);
486 :
487 1559 : if (syncworker)
488 : {
489 : /* Found one, update our copy of its state */
490 733 : SpinLockAcquire(&syncworker->relmutex);
491 733 : rstate->state = syncworker->relstate;
492 733 : rstate->lsn = syncworker->relstate_lsn;
493 733 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
494 : {
495 : /*
496 : * Sync worker is waiting for apply. Tell sync worker it
497 : * can catchup now.
498 : */
499 190 : syncworker->relstate = SUBREL_STATE_CATCHUP;
500 190 : syncworker->relstate_lsn =
501 190 : Max(syncworker->relstate_lsn, current_lsn);
502 : }
503 733 : SpinLockRelease(&syncworker->relmutex);
504 :
505 : /* If we told worker to catch up, wait for it. */
506 733 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
507 : {
508 : /* Signal the sync worker, as it may be waiting for us. */
509 190 : if (syncworker->proc)
510 190 : logicalrep_worker_wakeup_ptr(syncworker);
511 :
512 : /* Now safe to release the LWLock */
513 190 : LWLockRelease(LogicalRepWorkerLock);
514 :
515 190 : if (started_tx)
516 : {
517 : /*
518 : * We must commit the existing transaction to release
519 : * the existing locks before entering a busy loop.
520 : * This is required to avoid any undetected deadlocks
521 : * due to any existing lock as deadlock detector won't
522 : * be able to detect the waits on the latch.
523 : *
524 : * Also close any tables prior to the commit.
525 : */
526 190 : if (rel)
527 : {
528 29 : table_close(rel, NoLock);
529 29 : rel = NULL;
530 : }
531 190 : CommitTransactionCommand();
532 190 : pgstat_report_stat(false);
533 : }
534 :
535 : /*
536 : * Enter busy loop and wait for synchronization worker to
537 : * reach expected state (or die trying).
538 : */
539 190 : StartTransactionCommand();
540 190 : started_tx = true;
541 :
542 190 : wait_for_table_state_change(rstate->relid,
543 : SUBREL_STATE_SYNCDONE);
544 : }
545 : else
546 543 : LWLockRelease(LogicalRepWorkerLock);
547 : }
548 : else
549 : {
550 : /*
551 : * If there is no sync worker for this table yet, count
552 : * running sync workers for this subscription, while we have
553 : * the lock.
554 : */
555 : int nsyncworkers =
556 826 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
557 : struct tablesync_start_time_mapping *hentry;
558 : bool found;
559 :
560 : /* Now safe to release the LWLock */
561 826 : LWLockRelease(LogicalRepWorkerLock);
562 :
563 826 : hentry = hash_search(last_start_times, &rstate->relid,
564 : HASH_ENTER, &found);
565 826 : if (!found)
566 199 : hentry->last_start_time = 0;
567 :
568 826 : launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers,
569 : rstate->relid, &hentry->last_start_time);
570 : }
571 : }
572 : }
573 :
574 : /* Close table if opened */
575 6700 : if (rel)
576 159 : table_close(rel, NoLock);
577 :
578 :
579 6700 : if (started_tx)
580 : {
581 : /*
582 : * Even when the two_phase mode is requested by the user, it remains
583 : * as 'pending' until all tablesyncs have reached READY state.
584 : *
585 : * When this happens, we restart the apply worker and (if the
586 : * conditions are still ok) then the two_phase tri-state will become
587 : * 'enabled' at that time.
588 : *
589 : * Note: If the subscription has no tables then leave the state as
590 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
591 : * work.
592 : */
593 1094 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
594 : {
595 39 : CommandCounterIncrement(); /* make updates visible */
596 39 : if (AllTablesyncsReady())
597 : {
598 6 : ereport(LOG,
599 : (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
600 : MySubscription->name)));
601 6 : should_exit = true;
602 : }
603 : }
604 :
605 1094 : CommitTransactionCommand();
606 1094 : pgstat_report_stat(true);
607 : }
608 :
609 6700 : if (should_exit)
610 : {
611 : /*
612 : * Reset the last-start time for this worker so that the launcher will
613 : * restart it without waiting for wal_retrieve_retry_interval.
614 : */
615 6 : ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
616 :
617 6 : proc_exit(0);
618 : }
619 6694 : }
620 :
621 : /*
622 : * Create list of columns for COPY based on logical relation mapping.
623 : */
624 : static List *
625 203 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
626 : {
627 203 : List *attnamelist = NIL;
628 : int i;
629 :
630 539 : for (i = 0; i < rel->remoterel.natts; i++)
631 : {
632 336 : attnamelist = lappend(attnamelist,
633 336 : makeString(rel->remoterel.attnames[i]));
634 : }
635 :
636 :
637 203 : return attnamelist;
638 : }
639 :
640 : /*
641 : * Data source callback for the COPY FROM, which reads from the remote
642 : * connection and passes the data back to our local COPY.
643 : */
644 : static int
645 14017 : copy_read_data(void *outbuf, int minread, int maxread)
646 : {
647 14017 : int bytesread = 0;
648 : int avail;
649 :
650 : /* If there are some leftover data from previous read, use it. */
651 14017 : avail = copybuf->len - copybuf->cursor;
652 14017 : if (avail)
653 : {
654 0 : if (avail > maxread)
655 0 : avail = maxread;
656 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
657 0 : copybuf->cursor += avail;
658 0 : maxread -= avail;
659 0 : bytesread += avail;
660 : }
661 :
662 14023 : while (maxread > 0 && bytesread < minread)
663 : {
664 14023 : pgsocket fd = PGINVALID_SOCKET;
665 : int len;
666 14023 : char *buf = NULL;
667 :
668 : for (;;)
669 : {
670 : /* Try read the data. */
671 14023 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
672 :
673 14023 : CHECK_FOR_INTERRUPTS();
674 :
675 14023 : if (len == 0)
676 6 : break;
677 14017 : else if (len < 0)
678 14017 : return bytesread;
679 : else
680 : {
681 : /* Process the data */
682 13816 : copybuf->data = buf;
683 13816 : copybuf->len = len;
684 13816 : copybuf->cursor = 0;
685 :
686 13816 : avail = copybuf->len - copybuf->cursor;
687 13816 : if (avail > maxread)
688 0 : avail = maxread;
689 13816 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
690 13816 : outbuf = (char *) outbuf + avail;
691 13816 : copybuf->cursor += avail;
692 13816 : maxread -= avail;
693 13816 : bytesread += avail;
694 : }
695 :
696 13816 : if (maxread <= 0 || bytesread >= minread)
697 13816 : return bytesread;
698 : }
699 :
700 : /*
701 : * Wait for more data or latch.
702 : */
703 6 : (void) WaitLatchOrSocket(MyLatch,
704 : WL_SOCKET_READABLE | WL_LATCH_SET |
705 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
706 : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
707 :
708 6 : ResetLatch(MyLatch);
709 : }
710 :
711 0 : return bytesread;
712 : }
713 :
714 :
715 : /*
716 : * Get information about remote relation in similar fashion the RELATION
717 : * message provides during replication.
718 : *
719 : * This function also returns (a) the relation qualifications to be used in
720 : * the COPY command, and (b) whether the remote relation has published any
721 : * generated column.
722 : */
723 : static void
724 206 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
725 : List **qual, bool *gencol_published)
726 : {
727 : WalRcvExecResult *res;
728 : StringInfoData cmd;
729 : TupleTableSlot *slot;
730 206 : Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
731 206 : Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
732 206 : Oid qualRow[] = {TEXTOID};
733 : bool isnull;
734 : int natt;
735 206 : StringInfo pub_names = NULL;
736 206 : Bitmapset *included_cols = NULL;
737 206 : int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
738 :
739 206 : lrel->nspname = nspname;
740 206 : lrel->relname = relname;
741 :
742 : /* First fetch Oid and replica identity. */
743 206 : initStringInfo(&cmd);
744 206 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
745 : " FROM pg_catalog.pg_class c"
746 : " INNER JOIN pg_catalog.pg_namespace n"
747 : " ON (c.relnamespace = n.oid)"
748 : " WHERE n.nspname = %s"
749 : " AND c.relname = %s",
750 : quote_literal_cstr(nspname),
751 : quote_literal_cstr(relname));
752 206 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
753 : lengthof(tableRow), tableRow);
754 :
755 205 : if (res->status != WALRCV_OK_TUPLES)
756 0 : ereport(ERROR,
757 : (errcode(ERRCODE_CONNECTION_FAILURE),
758 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
759 : nspname, relname, res->err)));
760 :
761 205 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
762 205 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
763 0 : ereport(ERROR,
764 : (errcode(ERRCODE_UNDEFINED_OBJECT),
765 : errmsg("table \"%s.%s\" not found on publisher",
766 : nspname, relname)));
767 :
768 205 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
769 : Assert(!isnull);
770 205 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
771 : Assert(!isnull);
772 205 : lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
773 : Assert(!isnull);
774 :
775 205 : ExecDropSingleTupleTableSlot(slot);
776 205 : walrcv_clear_result(res);
777 :
778 :
779 : /*
780 : * Get column lists for each relation.
781 : *
782 : * We need to do this before fetching info about column names and types,
783 : * so that we can skip columns that should not be replicated.
784 : */
785 205 : if (server_version >= 150000)
786 : {
787 : WalRcvExecResult *pubres;
788 : TupleTableSlot *tslot;
789 205 : Oid attrsRow[] = {INT2VECTOROID};
790 :
791 : /* Build the pub_names comma-separated string. */
792 205 : pub_names = makeStringInfo();
793 205 : GetPublicationsStr(MySubscription->publications, pub_names, true);
794 :
795 : /*
796 : * Fetch info about column lists for the relation (from all the
797 : * publications).
798 : */
799 205 : resetStringInfo(&cmd);
800 205 : appendStringInfo(&cmd,
801 : "SELECT DISTINCT"
802 : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
803 : " THEN NULL ELSE gpt.attrs END)"
804 : " FROM pg_publication p,"
805 : " LATERAL pg_get_publication_tables(p.pubname) gpt,"
806 : " pg_class c"
807 : " WHERE gpt.relid = %u AND c.oid = gpt.relid"
808 : " AND p.pubname IN ( %s )",
809 : lrel->remoteid,
810 : pub_names->data);
811 :
812 205 : pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
813 : lengthof(attrsRow), attrsRow);
814 :
815 205 : if (pubres->status != WALRCV_OK_TUPLES)
816 0 : ereport(ERROR,
817 : (errcode(ERRCODE_CONNECTION_FAILURE),
818 : errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
819 : nspname, relname, pubres->err)));
820 :
821 : /*
822 : * We don't support the case where the column list is different for
823 : * the same table when combining publications. See comments atop
824 : * fetch_relation_list. So there should be only one row returned.
825 : * Although we already checked this when creating the subscription, we
826 : * still need to check here in case the column list was changed after
827 : * creating the subscription and before the sync worker is started.
828 : */
829 205 : if (tuplestore_tuple_count(pubres->tuplestore) > 1)
830 0 : ereport(ERROR,
831 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
832 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
833 : nspname, relname));
834 :
835 : /*
836 : * Get the column list and build a single bitmap with the attnums.
837 : *
838 : * If we find a NULL value, it means all the columns should be
839 : * replicated.
840 : */
841 205 : tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
842 205 : if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
843 : {
844 205 : Datum cfval = slot_getattr(tslot, 1, &isnull);
845 :
846 205 : if (!isnull)
847 : {
848 : ArrayType *arr;
849 : int nelems;
850 : int16 *elems;
851 :
852 22 : arr = DatumGetArrayTypeP(cfval);
853 22 : nelems = ARR_DIMS(arr)[0];
854 22 : elems = (int16 *) ARR_DATA_PTR(arr);
855 :
856 59 : for (natt = 0; natt < nelems; natt++)
857 37 : included_cols = bms_add_member(included_cols, elems[natt]);
858 : }
859 :
860 205 : ExecClearTuple(tslot);
861 : }
862 205 : ExecDropSingleTupleTableSlot(tslot);
863 :
864 205 : walrcv_clear_result(pubres);
865 : }
866 :
867 : /*
868 : * Now fetch column names and types.
869 : */
870 205 : resetStringInfo(&cmd);
871 205 : appendStringInfoString(&cmd,
872 : "SELECT a.attnum,"
873 : " a.attname,"
874 : " a.atttypid,"
875 : " a.attnum = ANY(i.indkey)");
876 :
877 : /* Generated columns can be replicated since version 18. */
878 205 : if (server_version >= 180000)
879 205 : appendStringInfoString(&cmd, ", a.attgenerated != ''");
880 :
881 410 : appendStringInfo(&cmd,
882 : " FROM pg_catalog.pg_attribute a"
883 : " LEFT JOIN pg_catalog.pg_index i"
884 : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
885 : " WHERE a.attnum > 0::pg_catalog.int2"
886 : " AND NOT a.attisdropped %s"
887 : " AND a.attrelid = %u"
888 : " ORDER BY a.attnum",
889 : lrel->remoteid,
890 205 : (server_version >= 120000 && server_version < 180000 ?
891 : "AND a.attgenerated = ''" : ""),
892 : lrel->remoteid);
893 205 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
894 : server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
895 :
896 205 : if (res->status != WALRCV_OK_TUPLES)
897 0 : ereport(ERROR,
898 : (errcode(ERRCODE_CONNECTION_FAILURE),
899 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
900 : nspname, relname, res->err)));
901 :
902 : /* We don't know the number of rows coming, so allocate enough space. */
903 205 : lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
904 205 : lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
905 205 : lrel->attkeys = NULL;
906 :
907 : /*
908 : * Store the columns as a list of names. Ignore those that are not
909 : * present in the column list, if there is one.
910 : */
911 205 : natt = 0;
912 205 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
913 578 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
914 : {
915 : char *rel_colname;
916 : AttrNumber attnum;
917 :
918 373 : attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
919 : Assert(!isnull);
920 :
921 : /* If the column is not in the column list, skip it. */
922 373 : if (included_cols != NULL && !bms_is_member(attnum, included_cols))
923 : {
924 31 : ExecClearTuple(slot);
925 31 : continue;
926 : }
927 :
928 342 : rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
929 : Assert(!isnull);
930 :
931 342 : lrel->attnames[natt] = rel_colname;
932 342 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
933 : Assert(!isnull);
934 :
935 342 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
936 110 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
937 :
938 : /* Remember if the remote table has published any generated column. */
939 342 : if (server_version >= 180000 && !(*gencol_published))
940 : {
941 342 : *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
942 : Assert(!isnull);
943 : }
944 :
945 : /* Should never happen. */
946 342 : if (++natt >= MaxTupleAttributeNumber)
947 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
948 : nspname, relname);
949 :
950 342 : ExecClearTuple(slot);
951 : }
952 205 : ExecDropSingleTupleTableSlot(slot);
953 :
954 205 : lrel->natts = natt;
955 :
956 205 : walrcv_clear_result(res);
957 :
958 : /*
959 : * Get relation's row filter expressions. DISTINCT avoids the same
960 : * expression of a table in multiple publications from being included
961 : * multiple times in the final expression.
962 : *
963 : * We need to copy the row even if it matches just one of the
964 : * publications, so we later combine all the quals with OR.
965 : *
966 : * For initial synchronization, row filtering can be ignored in following
967 : * cases:
968 : *
969 : * 1) one of the subscribed publications for the table hasn't specified
970 : * any row filter
971 : *
972 : * 2) one of the subscribed publications has puballtables set to true
973 : *
974 : * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
975 : * that includes this relation
976 : */
977 205 : if (server_version >= 150000)
978 : {
979 : /* Reuse the already-built pub_names. */
980 : Assert(pub_names != NULL);
981 :
982 : /* Check for row filters. */
983 205 : resetStringInfo(&cmd);
984 205 : appendStringInfo(&cmd,
985 : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
986 : " FROM pg_publication p,"
987 : " LATERAL pg_get_publication_tables(p.pubname) gpt"
988 : " WHERE gpt.relid = %u"
989 : " AND p.pubname IN ( %s )",
990 : lrel->remoteid,
991 : pub_names->data);
992 :
993 205 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
994 :
995 205 : if (res->status != WALRCV_OK_TUPLES)
996 0 : ereport(ERROR,
997 : (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
998 : nspname, relname, res->err)));
999 :
1000 : /*
1001 : * Multiple row filter expressions for the same table will be combined
1002 : * by COPY using OR. If any of the filter expressions for this table
1003 : * are null, it means the whole table will be copied. In this case it
1004 : * is not necessary to construct a unified row filter expression at
1005 : * all.
1006 : */
1007 205 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1008 220 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1009 : {
1010 209 : Datum rf = slot_getattr(slot, 1, &isnull);
1011 :
1012 209 : if (!isnull)
1013 15 : *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1014 : else
1015 : {
1016 : /* Ignore filters and cleanup as necessary. */
1017 194 : if (*qual)
1018 : {
1019 3 : list_free_deep(*qual);
1020 3 : *qual = NIL;
1021 : }
1022 194 : break;
1023 : }
1024 :
1025 15 : ExecClearTuple(slot);
1026 : }
1027 205 : ExecDropSingleTupleTableSlot(slot);
1028 :
1029 205 : walrcv_clear_result(res);
1030 205 : destroyStringInfo(pub_names);
1031 : }
1032 :
1033 205 : pfree(cmd.data);
1034 205 : }
1035 :
1036 : /*
1037 : * Copy existing data of a table from publisher.
1038 : *
1039 : * Caller is responsible for locking the local relation.
1040 : */
1041 : static void
1042 206 : copy_table(Relation rel)
1043 : {
1044 : LogicalRepRelMapEntry *relmapentry;
1045 : LogicalRepRelation lrel;
1046 206 : List *qual = NIL;
1047 : WalRcvExecResult *res;
1048 : StringInfoData cmd;
1049 : CopyFromState cstate;
1050 : List *attnamelist;
1051 : ParseState *pstate;
1052 206 : List *options = NIL;
1053 206 : bool gencol_published = false;
1054 :
1055 : /* Get the publisher relation info. */
1056 206 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
1057 206 : RelationGetRelationName(rel), &lrel, &qual,
1058 : &gencol_published);
1059 :
1060 : /* Put the relation into relmap. */
1061 205 : logicalrep_relmap_update(&lrel);
1062 :
1063 : /* Map the publisher relation to local one. */
1064 205 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1065 : Assert(rel == relmapentry->localrel);
1066 :
1067 : /* Start copy on the publisher. */
1068 203 : initStringInfo(&cmd);
1069 :
1070 : /* Regular or partitioned table with no row filter or generated columns */
1071 203 : if ((lrel.relkind == RELKIND_RELATION || lrel.relkind == RELKIND_PARTITIONED_TABLE)
1072 203 : && qual == NIL && !gencol_published)
1073 : {
1074 189 : appendStringInfo(&cmd, "COPY %s",
1075 189 : quote_qualified_identifier(lrel.nspname, lrel.relname));
1076 :
1077 : /* If the table has columns, then specify the columns */
1078 189 : if (lrel.natts)
1079 : {
1080 188 : appendStringInfoString(&cmd, " (");
1081 :
1082 : /*
1083 : * XXX Do we need to list the columns in all cases? Maybe we're
1084 : * replicating all columns?
1085 : */
1086 503 : for (int i = 0; i < lrel.natts; i++)
1087 : {
1088 315 : if (i > 0)
1089 127 : appendStringInfoString(&cmd, ", ");
1090 :
1091 315 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1092 : }
1093 :
1094 188 : appendStringInfoChar(&cmd, ')');
1095 : }
1096 :
1097 189 : appendStringInfoString(&cmd, " TO STDOUT");
1098 : }
1099 : else
1100 : {
1101 : /*
1102 : * For non-tables and tables with row filters, we need to do COPY
1103 : * (SELECT ...), but we can't just do SELECT * because we may need to
1104 : * copy only subset of columns including generated columns. For tables
1105 : * with any row filters, build a SELECT query with OR'ed row filters
1106 : * for COPY.
1107 : *
1108 : * We also need to use this same COPY (SELECT ...) syntax when
1109 : * generated columns are published, because copy of generated columns
1110 : * is not supported by the normal COPY.
1111 : */
1112 14 : appendStringInfoString(&cmd, "COPY (SELECT ");
1113 35 : for (int i = 0; i < lrel.natts; i++)
1114 : {
1115 21 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1116 21 : if (i < lrel.natts - 1)
1117 7 : appendStringInfoString(&cmd, ", ");
1118 : }
1119 :
1120 14 : appendStringInfoString(&cmd, " FROM ");
1121 :
1122 : /*
1123 : * For regular tables, make sure we don't copy data from a child that
1124 : * inherits the named table as those will be copied separately.
1125 : */
1126 14 : if (lrel.relkind == RELKIND_RELATION)
1127 11 : appendStringInfoString(&cmd, "ONLY ");
1128 :
1129 14 : appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1130 : /* list of OR'ed filters */
1131 14 : if (qual != NIL)
1132 : {
1133 : ListCell *lc;
1134 11 : char *q = strVal(linitial(qual));
1135 :
1136 11 : appendStringInfo(&cmd, " WHERE %s", q);
1137 12 : for_each_from(lc, qual, 1)
1138 : {
1139 1 : q = strVal(lfirst(lc));
1140 1 : appendStringInfo(&cmd, " OR %s", q);
1141 : }
1142 11 : list_free_deep(qual);
1143 : }
1144 :
1145 14 : appendStringInfoString(&cmd, ") TO STDOUT");
1146 : }
1147 :
1148 : /*
1149 : * Prior to v16, initial table synchronization will use text format even
1150 : * if the binary option is enabled for a subscription.
1151 : */
1152 203 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1153 203 : MySubscription->binary)
1154 : {
1155 5 : appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1156 5 : options = list_make1(makeDefElem("format",
1157 : (Node *) makeString("binary"), -1));
1158 : }
1159 :
1160 203 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1161 203 : pfree(cmd.data);
1162 203 : if (res->status != WALRCV_OK_COPY_OUT)
1163 0 : ereport(ERROR,
1164 : (errcode(ERRCODE_CONNECTION_FAILURE),
1165 : errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1166 : lrel.nspname, lrel.relname, res->err)));
1167 203 : walrcv_clear_result(res);
1168 :
1169 203 : copybuf = makeStringInfo();
1170 :
1171 203 : pstate = make_parsestate(NULL);
1172 203 : (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1173 : NULL, false, false);
1174 :
1175 203 : attnamelist = make_copy_attnamelist(relmapentry);
1176 203 : cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1177 :
1178 : /* Do the copy */
1179 202 : (void) CopyFrom(cstate);
1180 :
1181 192 : logicalrep_rel_close(relmapentry, NoLock);
1182 192 : }
1183 :
1184 : /*
1185 : * Determine the tablesync slot name.
1186 : *
1187 : * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1188 : * on slot name length. We append system_identifier to avoid slot_name
1189 : * collision with subscriptions in other clusters. With the current scheme
1190 : * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1191 : * length of slot_name will be 50.
1192 : *
1193 : * The returned slot name is stored in the supplied buffer (syncslotname) with
1194 : * the given size.
1195 : *
1196 : * Note: We don't use the subscription slot name as part of tablesync slot name
1197 : * because we are responsible for cleaning up these slots and it could become
1198 : * impossible to recalculate what name to cleanup if the subscription slot name
1199 : * had changed.
1200 : */
1201 : void
1202 404 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1203 : char *syncslotname, Size szslot)
1204 : {
1205 404 : snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1206 : relid, GetSystemIdentifier());
1207 404 : }
1208 :
1209 : /*
1210 : * Start syncing the table in the sync worker.
1211 : *
1212 : * If nothing needs to be done to sync the table, we exit the worker without
1213 : * any further action.
1214 : *
1215 : * The returned slot name is palloc'ed in current memory context.
1216 : */
1217 : static char *
1218 207 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1219 : {
1220 : char *slotname;
1221 : char *err;
1222 : char relstate;
1223 : XLogRecPtr relstate_lsn;
1224 : Relation rel;
1225 : AclResult aclresult;
1226 : WalRcvExecResult *res;
1227 : char originname[NAMEDATALEN];
1228 : ReplOriginId originid;
1229 : UserContext ucxt;
1230 : bool must_use_password;
1231 : bool run_as_owner;
1232 :
1233 : /* Check the state of the table synchronization. */
1234 207 : StartTransactionCommand();
1235 207 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1236 207 : MyLogicalRepWorker->relid,
1237 : &relstate_lsn);
1238 207 : CommitTransactionCommand();
1239 :
1240 : /* Is the use of a password mandatory? */
1241 410 : must_use_password = MySubscription->passwordrequired &&
1242 203 : !MySubscription->ownersuperuser;
1243 :
1244 207 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1245 207 : MyLogicalRepWorker->relstate = relstate;
1246 207 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1247 207 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1248 :
1249 : /*
1250 : * If synchronization is already done or no longer necessary, exit now
1251 : * that we've updated shared memory state.
1252 : */
1253 207 : switch (relstate)
1254 : {
1255 0 : case SUBREL_STATE_SYNCDONE:
1256 : case SUBREL_STATE_READY:
1257 : case SUBREL_STATE_UNKNOWN:
1258 0 : FinishSyncWorker(); /* doesn't return */
1259 : }
1260 :
1261 : /* Calculate the name of the tablesync slot. */
1262 207 : slotname = (char *) palloc(NAMEDATALEN);
1263 207 : ReplicationSlotNameForTablesync(MySubscription->oid,
1264 207 : MyLogicalRepWorker->relid,
1265 : slotname,
1266 : NAMEDATALEN);
1267 :
1268 : /*
1269 : * Here we use the slot name instead of the subscription name as the
1270 : * application_name, so that it is different from the leader apply worker,
1271 : * so that synchronous replication can distinguish them.
1272 : */
1273 207 : LogRepWorkerWalRcvConn =
1274 207 : walrcv_connect(MySubscription->conninfo, true, true,
1275 : must_use_password,
1276 : slotname, &err);
1277 207 : if (LogRepWorkerWalRcvConn == NULL)
1278 0 : ereport(ERROR,
1279 : (errcode(ERRCODE_CONNECTION_FAILURE),
1280 : errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1281 : MySubscription->name, err)));
1282 :
1283 : Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1284 : MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1285 : MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1286 :
1287 : /* Assign the origin tracking record name. */
1288 207 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1289 207 : MyLogicalRepWorker->relid,
1290 : originname,
1291 : sizeof(originname));
1292 :
1293 207 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1294 : {
1295 : /*
1296 : * We have previously errored out before finishing the copy so the
1297 : * replication slot might exist. We want to remove the slot if it
1298 : * already exists and proceed.
1299 : *
1300 : * XXX We could also instead try to drop the slot, last time we failed
1301 : * but for that, we might need to clean up the copy state as it might
1302 : * be in the middle of fetching the rows. Also, if there is a network
1303 : * breakdown then it wouldn't have succeeded so trying it next time
1304 : * seems like a better bet.
1305 : */
1306 9 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1307 : }
1308 198 : else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1309 : {
1310 : /*
1311 : * The COPY phase was previously done, but tablesync then crashed
1312 : * before it was able to finish normally.
1313 : */
1314 0 : StartTransactionCommand();
1315 :
1316 : /*
1317 : * The origin tracking name must already exist. It was created first
1318 : * time this tablesync was launched.
1319 : */
1320 0 : originid = replorigin_by_name(originname, false);
1321 0 : replorigin_session_setup(originid, 0);
1322 0 : replorigin_xact_state.origin = originid;
1323 0 : *origin_startpos = replorigin_session_get_progress(false);
1324 :
1325 0 : CommitTransactionCommand();
1326 :
1327 0 : goto copy_table_done;
1328 : }
1329 :
1330 207 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1331 207 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1332 207 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1333 207 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1334 :
1335 : /*
1336 : * Update the state, create the replication origin, and make them visible
1337 : * to others.
1338 : */
1339 207 : StartTransactionCommand();
1340 207 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1341 207 : MyLogicalRepWorker->relid,
1342 207 : MyLogicalRepWorker->relstate,
1343 207 : MyLogicalRepWorker->relstate_lsn,
1344 : false);
1345 :
1346 : /*
1347 : * Create the replication origin in a separate transaction from the one
1348 : * that sets up the origin in shared memory. This prevents the risk that
1349 : * changes to the origin in shared memory cannot be rolled back if the
1350 : * transaction aborts.
1351 : */
1352 206 : originid = replorigin_by_name(originname, true);
1353 206 : if (!OidIsValid(originid))
1354 197 : originid = replorigin_create(originname);
1355 :
1356 206 : CommitTransactionCommand();
1357 206 : pgstat_report_stat(true);
1358 :
1359 206 : StartTransactionCommand();
1360 :
1361 : /*
1362 : * Use a standard write lock here. It might be better to disallow access
1363 : * to the table while it's being synchronized. But we don't want to block
1364 : * the main apply process from working and it has to open the relation in
1365 : * RowExclusiveLock when remapping remote relation id to local one.
1366 : */
1367 206 : rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
1368 :
1369 : /*
1370 : * Start a transaction in the remote node in REPEATABLE READ mode. This
1371 : * ensures that both the replication slot we create (see below) and the
1372 : * COPY are consistent with each other.
1373 : */
1374 206 : res = walrcv_exec(LogRepWorkerWalRcvConn,
1375 : "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1376 : 0, NULL);
1377 206 : if (res->status != WALRCV_OK_COMMAND)
1378 0 : ereport(ERROR,
1379 : (errcode(ERRCODE_CONNECTION_FAILURE),
1380 : errmsg("table copy could not start transaction on publisher: %s",
1381 : res->err)));
1382 206 : walrcv_clear_result(res);
1383 :
1384 : /*
1385 : * Create a new permanent logical decoding slot. This slot will be used
1386 : * for the catchup phase after COPY is done, so tell it to use the
1387 : * snapshot to make the final data consistent.
1388 : */
1389 206 : walrcv_create_slot(LogRepWorkerWalRcvConn,
1390 : slotname, false /* permanent */ , false /* two_phase */ ,
1391 : MySubscription->failover,
1392 : CRS_USE_SNAPSHOT, origin_startpos);
1393 :
1394 : /*
1395 : * Advance the origin to the LSN got from walrcv_create_slot and then set
1396 : * up the origin. The advancement is WAL logged for the purpose of
1397 : * recovery. Locks are to prevent the replication origin from vanishing
1398 : * while advancing.
1399 : *
1400 : * The purpose of doing these before the copy is to avoid doing the copy
1401 : * again due to any error in advancing or setting up origin tracking.
1402 : */
1403 206 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1404 206 : replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1405 : true /* go backward */ , true /* WAL log */ );
1406 206 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1407 :
1408 206 : replorigin_session_setup(originid, 0);
1409 206 : replorigin_xact_state.origin = originid;
1410 :
1411 : /*
1412 : * If the user did not opt to run as the owner of the subscription
1413 : * ('run_as_owner'), then copy the table as the owner of the table.
1414 : */
1415 206 : run_as_owner = MySubscription->runasowner;
1416 206 : if (!run_as_owner)
1417 205 : SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1418 :
1419 : /*
1420 : * Check that our table sync worker has permission to insert into the
1421 : * target table.
1422 : */
1423 206 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1424 : ACL_INSERT);
1425 206 : if (aclresult != ACLCHECK_OK)
1426 0 : aclcheck_error(aclresult,
1427 0 : get_relkind_objtype(rel->rd_rel->relkind),
1428 0 : RelationGetRelationName(rel));
1429 :
1430 : /*
1431 : * COPY FROM does not honor RLS policies. That is not a problem for
1432 : * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1433 : * who has it implicitly), but other roles should not be able to
1434 : * circumvent RLS. Disallow logical replication into RLS enabled
1435 : * relations for such roles.
1436 : */
1437 206 : if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
1438 0 : ereport(ERROR,
1439 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1440 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1441 : GetUserNameFromId(GetUserId(), true),
1442 : RelationGetRelationName(rel))));
1443 :
1444 : /* Now do the initial data copy */
1445 206 : PushActiveSnapshot(GetTransactionSnapshot());
1446 206 : copy_table(rel);
1447 192 : PopActiveSnapshot();
1448 :
1449 192 : res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1450 192 : if (res->status != WALRCV_OK_COMMAND)
1451 0 : ereport(ERROR,
1452 : (errcode(ERRCODE_CONNECTION_FAILURE),
1453 : errmsg("table copy could not finish transaction on publisher: %s",
1454 : res->err)));
1455 192 : walrcv_clear_result(res);
1456 :
1457 192 : if (!run_as_owner)
1458 191 : RestoreUserContext(&ucxt);
1459 :
1460 192 : table_close(rel, NoLock);
1461 :
1462 : /* Make the copy visible. */
1463 192 : CommandCounterIncrement();
1464 :
1465 : /*
1466 : * Update the persisted state to indicate the COPY phase is done; make it
1467 : * visible to others.
1468 : */
1469 192 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1470 192 : MyLogicalRepWorker->relid,
1471 : SUBREL_STATE_FINISHEDCOPY,
1472 192 : MyLogicalRepWorker->relstate_lsn,
1473 : false);
1474 :
1475 192 : CommitTransactionCommand();
1476 :
1477 192 : copy_table_done:
1478 :
1479 192 : elog(DEBUG1,
1480 : "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1481 : originname, LSN_FORMAT_ARGS(*origin_startpos));
1482 :
1483 : /*
1484 : * We are done with the initial data synchronization, update the state.
1485 : */
1486 192 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1487 192 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1488 192 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1489 192 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1490 :
1491 : /*
1492 : * Finally, wait until the leader apply worker tells us to catch up and
1493 : * then return to let LogicalRepApplyLoop do it.
1494 : */
1495 192 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1496 192 : return slotname;
1497 : }
1498 :
1499 : /*
1500 : * Execute the initial sync with error handling. Disable the subscription,
1501 : * if it's required.
1502 : *
1503 : * Allocate the slot name in long-lived context on return. Note that we don't
1504 : * handle FATAL errors which are probably because of system resource error and
1505 : * are not repeatable.
1506 : */
1507 : static void
1508 207 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1509 : {
1510 207 : char *sync_slotname = NULL;
1511 :
1512 : Assert(am_tablesync_worker());
1513 :
1514 207 : PG_TRY();
1515 : {
1516 : /* Call initial sync. */
1517 207 : sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1518 : }
1519 13 : PG_CATCH();
1520 : {
1521 13 : if (MySubscription->disableonerr)
1522 1 : DisableSubscriptionAndExit();
1523 : else
1524 : {
1525 : /*
1526 : * Report the worker failed during table synchronization. Abort
1527 : * the current transaction so that the stats message is sent in an
1528 : * idle state.
1529 : */
1530 12 : AbortOutOfAnyTransaction();
1531 12 : pgstat_report_subscription_error(MySubscription->oid);
1532 :
1533 12 : PG_RE_THROW();
1534 : }
1535 : }
1536 192 : PG_END_TRY();
1537 :
1538 : /* allocate slot name in long-lived context */
1539 192 : *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1540 192 : pfree(sync_slotname);
1541 192 : }
1542 :
1543 : /*
1544 : * Runs the tablesync worker.
1545 : *
1546 : * It starts syncing tables. After a successful sync, sets streaming options
1547 : * and starts streaming to catchup with apply worker.
1548 : */
1549 : static void
1550 207 : run_tablesync_worker(void)
1551 : {
1552 : char originname[NAMEDATALEN];
1553 207 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1554 207 : char *slotname = NULL;
1555 : WalRcvStreamOptions options;
1556 :
1557 207 : start_table_sync(&origin_startpos, &slotname);
1558 :
1559 192 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1560 192 : MyLogicalRepWorker->relid,
1561 : originname,
1562 : sizeof(originname));
1563 :
1564 192 : set_apply_error_context_origin(originname);
1565 :
1566 192 : set_stream_options(&options, slotname, &origin_startpos);
1567 :
1568 192 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1569 :
1570 : /* Apply the changes till we catchup with the apply worker. */
1571 192 : start_apply(origin_startpos);
1572 0 : }
1573 :
1574 : /* Logical Replication Tablesync worker entry point */
1575 : void
1576 207 : TableSyncWorkerMain(Datum main_arg)
1577 : {
1578 207 : int worker_slot = DatumGetInt32(main_arg);
1579 :
1580 207 : SetupApplyOrSyncWorker(worker_slot);
1581 :
1582 207 : run_tablesync_worker();
1583 :
1584 0 : FinishSyncWorker();
1585 : }
1586 :
1587 : /*
1588 : * If the subscription has no tables then return false.
1589 : *
1590 : * Otherwise, are all tablesyncs READY?
1591 : *
1592 : * Note: This function is not suitable to be called from outside of apply or
1593 : * tablesync workers because MySubscription needs to be already initialized.
1594 : */
1595 : bool
1596 189 : AllTablesyncsReady(void)
1597 : {
1598 : bool started_tx;
1599 : bool has_tables;
1600 :
1601 : /* We need up-to-date sync state info for subscription tables here. */
1602 189 : FetchRelationStates(&has_tables, NULL, &started_tx);
1603 :
1604 189 : if (started_tx)
1605 : {
1606 16 : CommitTransactionCommand();
1607 16 : pgstat_report_stat(true);
1608 : }
1609 :
1610 : /*
1611 : * Return false when there are no tables in subscription or not all tables
1612 : * are in ready state; true otherwise.
1613 : */
1614 189 : return has_tables && (table_states_not_ready == NIL);
1615 : }
1616 :
1617 : /*
1618 : * Return whether the subscription currently has any tables.
1619 : *
1620 : * Note: Unlike HasSubscriptionTables(), this function relies on cached
1621 : * information for subscription tables. Additionally, it should not be
1622 : * invoked outside of apply or tablesync workers, as MySubscription must be
1623 : * initialized first.
1624 : */
1625 : bool
1626 106 : HasSubscriptionTablesCached(void)
1627 : {
1628 : bool started_tx;
1629 : bool has_tables;
1630 :
1631 : /* We need up-to-date subscription tables info here */
1632 106 : FetchRelationStates(&has_tables, NULL, &started_tx);
1633 :
1634 106 : if (started_tx)
1635 : {
1636 0 : CommitTransactionCommand();
1637 0 : pgstat_report_stat(true);
1638 : }
1639 :
1640 106 : return has_tables;
1641 : }
1642 :
1643 : /*
1644 : * Update the two_phase state of the specified subscription in pg_subscription.
1645 : */
1646 : void
1647 10 : UpdateTwoPhaseState(Oid suboid, char new_state)
1648 : {
1649 : Relation rel;
1650 : HeapTuple tup;
1651 : bool nulls[Natts_pg_subscription];
1652 : bool replaces[Natts_pg_subscription];
1653 : Datum values[Natts_pg_subscription];
1654 :
1655 : Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1656 : new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1657 : new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1658 :
1659 10 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1660 10 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1661 10 : if (!HeapTupleIsValid(tup))
1662 0 : elog(ERROR,
1663 : "cache lookup failed for subscription oid %u",
1664 : suboid);
1665 :
1666 : /* Form a new tuple. */
1667 10 : memset(values, 0, sizeof(values));
1668 10 : memset(nulls, false, sizeof(nulls));
1669 10 : memset(replaces, false, sizeof(replaces));
1670 :
1671 : /* And update/set two_phase state */
1672 10 : values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1673 10 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1674 :
1675 10 : tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1676 : values, nulls, replaces);
1677 10 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1678 :
1679 10 : heap_freetuple(tup);
1680 10 : table_close(rel, RowExclusiveLock);
1681 10 : }
|