Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * tablesync.c
3 : * PostgreSQL logical replication: initial table data synchronization
4 : *
5 : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/tablesync.c
9 : *
10 : * NOTES
11 : * This file contains code for initial table data synchronization for
12 : * logical replication.
13 : *
14 : * The initial data synchronization is done separately for each table,
15 : * in a separate apply worker that only fetches the initial snapshot data
16 : * from the publisher and then synchronizes the position in the stream with
17 : * the leader apply worker.
18 : *
19 : * There are several reasons for doing the synchronization this way:
20 : * - It allows us to parallelize the initial data synchronization
21 : * which lowers the time needed for it to happen.
22 : * - The initial synchronization does not have to hold the xid and LSN
23 : * for the time it takes to copy data of all tables, causing less
24 : * bloat and lower disk consumption compared to doing the
25 : * synchronization in a single process for the whole database.
26 : * - It allows us to synchronize any tables added after the initial
27 : * synchronization has finished.
28 : *
29 : * The stream position synchronization works in multiple steps:
30 : * - Apply worker requests a tablesync worker to start, setting the new
31 : * table state to INIT.
32 : * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 : * copying.
34 : * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 : * worker specific) state to indicate when the copy phase has completed, so
36 : * if the worker crashes with this (non-memory) state then the copy will not
37 : * be re-attempted.
38 : * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 : * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 : * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 : * until either the table state is set to SYNCDONE or the sync worker
42 : * exits.
43 : * - After the sync worker has seen the state change to CATCHUP, it will
44 : * read the stream and apply changes (acting like an apply worker) until
45 : * it catches up to the specified stream position. Then it sets the
46 : * state to SYNCDONE. There might be zero changes applied between
47 : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 : * apply worker.
49 : * - Once the state is set to SYNCDONE, the apply will continue tracking
50 : * the table until it reaches the SYNCDONE stream position, at which
51 : * point it sets state to READY and stops tracking. Again, there might
52 : * be zero changes in between.
53 : *
54 : * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 : * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 : *
57 : * The catalog pg_subscription_rel is used to keep information about
58 : * subscribed tables and their state. The catalog holds all states
59 : * except SYNCWAIT and CATCHUP which are only in shared memory.
60 : *
61 : * Example flows look like this:
62 : * - Apply is in front:
63 : * sync:8
64 : * -> set in catalog FINISHEDCOPY
65 : * -> set in memory SYNCWAIT
66 : * apply:10
67 : * -> set in memory CATCHUP
68 : * -> enter wait-loop
69 : * sync:10
70 : * -> set in catalog SYNCDONE
71 : * -> exit
72 : * apply:10
73 : * -> exit wait-loop
74 : * -> continue rep
75 : * apply:11
76 : * -> set in catalog READY
77 : *
78 : * - Sync is in front:
79 : * sync:10
80 : * -> set in catalog FINISHEDCOPY
81 : * -> set in memory SYNCWAIT
82 : * apply:8
83 : * -> set in memory CATCHUP
84 : * -> continue per-table filtering
85 : * sync:10
86 : * -> set in catalog SYNCDONE
87 : * -> exit
88 : * apply:10
89 : * -> set in catalog READY
90 : * -> stop per-table filtering
91 : * -> continue rep
92 : *-------------------------------------------------------------------------
93 : */
94 :
95 : #include "postgres.h"
96 :
97 : #include "access/table.h"
98 : #include "access/xact.h"
99 : #include "catalog/indexing.h"
100 : #include "catalog/pg_subscription_rel.h"
101 : #include "catalog/pg_type.h"
102 : #include "commands/copy.h"
103 : #include "miscadmin.h"
104 : #include "nodes/makefuncs.h"
105 : #include "parser/parse_relation.h"
106 : #include "pgstat.h"
107 : #include "replication/logicallauncher.h"
108 : #include "replication/logicalrelation.h"
109 : #include "replication/logicalworker.h"
110 : #include "replication/origin.h"
111 : #include "replication/slot.h"
112 : #include "replication/walreceiver.h"
113 : #include "replication/worker_internal.h"
114 : #include "storage/ipc.h"
115 : #include "storage/latch.h"
116 : #include "storage/lmgr.h"
117 : #include "utils/acl.h"
118 : #include "utils/array.h"
119 : #include "utils/builtins.h"
120 : #include "utils/lsyscache.h"
121 : #include "utils/rls.h"
122 : #include "utils/snapmgr.h"
123 : #include "utils/syscache.h"
124 : #include "utils/usercontext.h"
125 : #include "utils/wait_event.h"
126 :
127 : List *table_states_not_ready = NIL;
128 :
129 : static StringInfo copybuf = NULL;
130 :
131 : /*
132 : * Wait until the relation sync state is set in the catalog to the expected
133 : * one; return true when it happens.
134 : *
135 : * Returns false if the table sync worker or the table itself have
136 : * disappeared, or the table state has been reset.
137 : *
138 : * Currently, this is used in the apply worker when transitioning from
139 : * CATCHUP state to SYNCDONE.
140 : */
141 : static bool
142 197 : wait_for_table_state_change(Oid relid, char expected_state)
143 : {
144 : char state;
145 :
146 : for (;;)
147 217 : {
148 : LogicalRepWorker *worker;
149 : XLogRecPtr statelsn;
150 :
151 414 : CHECK_FOR_INTERRUPTS();
152 :
153 414 : InvalidateCatalogSnapshot();
154 414 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
155 : relid, &statelsn);
156 :
157 414 : if (state == SUBREL_STATE_UNKNOWN)
158 0 : break;
159 :
160 414 : if (state == expected_state)
161 197 : return true;
162 :
163 : /* Check if the sync worker is still running and bail if not. */
164 217 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
165 217 : worker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
166 217 : MyLogicalRepWorker->subid, relid,
167 : false);
168 217 : LWLockRelease(LogicalRepWorkerLock);
169 217 : if (!worker)
170 0 : break;
171 :
172 217 : (void) WaitLatch(MyLatch,
173 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
174 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
175 :
176 217 : ResetLatch(MyLatch);
177 : }
178 :
179 0 : return false;
180 : }
181 :
182 : /*
183 : * Wait until the apply worker changes the state of our synchronization
184 : * worker to the expected one.
185 : *
186 : * Used when transitioning from SYNCWAIT state to CATCHUP.
187 : *
188 : * Returns false if the apply worker has disappeared.
189 : */
190 : static bool
191 199 : wait_for_worker_state_change(char expected_state)
192 : {
193 : int rc;
194 :
195 : for (;;)
196 199 : {
197 : LogicalRepWorker *worker;
198 :
199 398 : CHECK_FOR_INTERRUPTS();
200 :
201 : /*
202 : * Done if already in correct state. (We assume this fetch is atomic
203 : * enough to not give a misleading answer if we do it with no lock.)
204 : */
205 398 : if (MyLogicalRepWorker->relstate == expected_state)
206 199 : return true;
207 :
208 : /*
209 : * Bail out if the apply worker has died, else signal it we're
210 : * waiting.
211 : */
212 199 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
213 199 : worker = logicalrep_worker_find(WORKERTYPE_APPLY,
214 199 : MyLogicalRepWorker->subid, InvalidOid,
215 : false);
216 199 : if (worker && worker->proc)
217 199 : logicalrep_worker_wakeup_ptr(worker);
218 199 : LWLockRelease(LogicalRepWorkerLock);
219 199 : if (!worker)
220 0 : break;
221 :
222 : /*
223 : * Wait. We expect to get a latch signal back from the apply worker,
224 : * but use a timeout in case it dies without sending one.
225 : */
226 199 : rc = WaitLatch(MyLatch,
227 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
228 : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
229 :
230 199 : if (rc & WL_LATCH_SET)
231 199 : ResetLatch(MyLatch);
232 : }
233 :
234 0 : return false;
235 : }
236 :
237 : /*
238 : * Handle table synchronization cooperation from the synchronization
239 : * worker.
240 : *
241 : * If the sync worker is in CATCHUP state and reached (or passed) the
242 : * predetermined synchronization point in the WAL stream, mark the table as
243 : * SYNCDONE and finish.
244 : */
245 : void
246 214 : ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
247 : {
248 214 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
249 :
250 214 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
251 214 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
252 : {
253 : TimeLineID tli;
254 199 : char syncslotname[NAMEDATALEN] = {0};
255 199 : char originname[NAMEDATALEN] = {0};
256 :
257 199 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
258 199 : MyLogicalRepWorker->relstate_lsn = current_lsn;
259 :
260 199 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
261 :
262 : /*
263 : * UpdateSubscriptionRelState must be called within a transaction.
264 : */
265 199 : if (!IsTransactionState())
266 199 : StartTransactionCommand();
267 :
268 199 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
269 199 : MyLogicalRepWorker->relid,
270 199 : MyLogicalRepWorker->relstate,
271 199 : MyLogicalRepWorker->relstate_lsn,
272 : false);
273 :
274 : /*
275 : * End streaming so that LogRepWorkerWalRcvConn can be used to drop
276 : * the slot.
277 : */
278 199 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
279 :
280 : /*
281 : * Cleanup the tablesync slot.
282 : *
283 : * This has to be done after updating the state because otherwise if
284 : * there is an error while doing the database operations we won't be
285 : * able to rollback dropped slot.
286 : */
287 199 : ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
288 199 : MyLogicalRepWorker->relid,
289 : syncslotname,
290 : sizeof(syncslotname));
291 :
292 : /*
293 : * It is important to give an error if we are unable to drop the slot,
294 : * otherwise, it won't be dropped till the corresponding subscription
295 : * is dropped. So passing missing_ok = false.
296 : */
297 199 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
298 :
299 199 : CommitTransactionCommand();
300 199 : pgstat_report_stat(false);
301 :
302 : /*
303 : * Start a new transaction to clean up the tablesync origin tracking.
304 : * This transaction will be ended within the FinishSyncWorker(). Now,
305 : * even, if we fail to remove this here, the apply worker will ensure
306 : * to clean it up afterward.
307 : *
308 : * We need to do this after the table state is set to SYNCDONE.
309 : * Otherwise, if an error occurs while performing the database
310 : * operation, the worker will be restarted and the in-memory state of
311 : * replication progress (remote_lsn) won't be rolled-back which would
312 : * have been cleared before restart. So, the restarted worker will use
313 : * invalid replication progress state resulting in replay of
314 : * transactions that have already been applied.
315 : */
316 199 : StartTransactionCommand();
317 :
318 199 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
319 199 : MyLogicalRepWorker->relid,
320 : originname,
321 : sizeof(originname));
322 :
323 : /*
324 : * Resetting the origin session removes the ownership of the slot.
325 : * This is needed to allow the origin to be dropped.
326 : */
327 199 : replorigin_session_reset();
328 199 : replorigin_xact_clear(true);
329 :
330 : /*
331 : * Drop the tablesync's origin tracking if exists.
332 : *
333 : * There is a chance that the user is concurrently performing refresh
334 : * for the subscription where we remove the table state and its origin
335 : * or the apply worker would have removed this origin. So passing
336 : * missing_ok = true.
337 : */
338 199 : replorigin_drop_by_name(originname, true, false);
339 :
340 199 : FinishSyncWorker();
341 : }
342 : else
343 15 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
344 15 : }
345 :
346 : /*
347 : * Handle table synchronization cooperation from the apply worker.
348 : *
349 : * Walk over all subscription tables that are individually tracked by the
350 : * apply process (currently, all that have state other than
351 : * SUBREL_STATE_READY) and manage synchronization for them.
352 : *
353 : * If there are tables that need synchronizing and are not being synchronized
354 : * yet, start sync workers for them (if there are free slots for sync
355 : * workers). To prevent starting the sync worker for the same relation at a
356 : * high frequency after a failure, we store its last start time with each sync
357 : * state info. We start the sync worker for the same relation after waiting
358 : * at least wal_retrieve_retry_interval.
359 : *
360 : * For tables that are being synchronized already, check if sync workers
361 : * either need action from the apply worker or have finished. This is the
362 : * SYNCWAIT to CATCHUP transition.
363 : *
364 : * If the synchronization position is reached (SYNCDONE), then the table can
365 : * be marked as READY and is no longer tracked.
366 : */
367 : void
368 8387 : ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
369 : {
370 : struct tablesync_start_time_mapping
371 : {
372 : Oid relid;
373 : TimestampTz last_start_time;
374 : };
375 : static HTAB *last_start_times = NULL;
376 : ListCell *lc;
377 : bool started_tx;
378 8387 : bool should_exit = false;
379 8387 : Relation rel = NULL;
380 :
381 : Assert(!IsTransactionState());
382 :
383 : /* We need up-to-date sync state info for subscription tables here. */
384 8387 : FetchRelationStates(NULL, NULL, &started_tx);
385 :
386 : /*
387 : * Prepare a hash table for tracking last start times of workers, to avoid
388 : * immediate restarts. We don't need it if there are no tables that need
389 : * syncing.
390 : */
391 8387 : if (table_states_not_ready != NIL && !last_start_times)
392 129 : {
393 : HASHCTL ctl;
394 :
395 129 : ctl.keysize = sizeof(Oid);
396 129 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
397 129 : last_start_times = hash_create("Logical replication table sync worker start times",
398 : 256, &ctl, HASH_ELEM | HASH_BLOBS);
399 : }
400 :
401 : /*
402 : * Clean up the hash table when we're done with all tables (just to
403 : * release the bit of memory).
404 : */
405 8258 : else if (table_states_not_ready == NIL && last_start_times)
406 : {
407 100 : hash_destroy(last_start_times);
408 100 : last_start_times = NULL;
409 : }
410 :
411 : /*
412 : * Process all tables that are being synchronized.
413 : */
414 10232 : foreach(lc, table_states_not_ready)
415 : {
416 1845 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
417 :
418 1845 : if (!started_tx)
419 : {
420 325 : StartTransactionCommand();
421 325 : started_tx = true;
422 : }
423 :
424 : Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
425 :
426 1845 : if (rstate->state == SUBREL_STATE_SYNCDONE)
427 : {
428 : /*
429 : * Apply has caught up to the position where the table sync has
430 : * finished. Mark the table as ready so that the apply will just
431 : * continue to replicate it normally.
432 : */
433 196 : if (current_lsn >= rstate->lsn)
434 : {
435 : char originname[NAMEDATALEN];
436 :
437 196 : rstate->state = SUBREL_STATE_READY;
438 196 : rstate->lsn = current_lsn;
439 :
440 : /*
441 : * Remove the tablesync origin tracking if exists.
442 : *
443 : * There is a chance that the user is concurrently performing
444 : * refresh for the subscription where we remove the table
445 : * state and its origin or the tablesync worker would have
446 : * already removed this origin. We can't rely on tablesync
447 : * worker to remove the origin tracking as if there is any
448 : * error while dropping we won't restart it to drop the
449 : * origin. So passing missing_ok = true.
450 : *
451 : * Lock the subscription and origin in the same order as we
452 : * are doing during DDL commands to avoid deadlocks. See
453 : * AlterSubscription_refresh.
454 : */
455 196 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
456 : 0, AccessShareLock);
457 :
458 196 : if (!rel)
459 196 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
460 :
461 196 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
462 : rstate->relid,
463 : originname,
464 : sizeof(originname));
465 196 : replorigin_drop_by_name(originname, true, false);
466 :
467 : /*
468 : * Update the state to READY only after the origin cleanup.
469 : */
470 196 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
471 196 : rstate->relid, rstate->state,
472 : rstate->lsn, true);
473 : }
474 : }
475 : else
476 : {
477 : LogicalRepWorker *syncworker;
478 :
479 : /*
480 : * Look for a sync worker for this relation.
481 : */
482 1649 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
483 :
484 1649 : syncworker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
485 1649 : MyLogicalRepWorker->subid,
486 : rstate->relid, false);
487 :
488 1649 : if (syncworker)
489 : {
490 : /* Found one, update our copy of its state */
491 769 : SpinLockAcquire(&syncworker->relmutex);
492 769 : rstate->state = syncworker->relstate;
493 769 : rstate->lsn = syncworker->relstate_lsn;
494 769 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
495 : {
496 : /*
497 : * Sync worker is waiting for apply. Tell sync worker it
498 : * can catchup now.
499 : */
500 197 : syncworker->relstate = SUBREL_STATE_CATCHUP;
501 197 : syncworker->relstate_lsn =
502 197 : Max(syncworker->relstate_lsn, current_lsn);
503 : }
504 769 : SpinLockRelease(&syncworker->relmutex);
505 :
506 : /* If we told worker to catch up, wait for it. */
507 769 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
508 : {
509 : /* Signal the sync worker, as it may be waiting for us. */
510 197 : if (syncworker->proc)
511 197 : logicalrep_worker_wakeup_ptr(syncworker);
512 :
513 : /* Now safe to release the LWLock */
514 197 : LWLockRelease(LogicalRepWorkerLock);
515 :
516 197 : if (started_tx)
517 : {
518 : /*
519 : * We must commit the existing transaction to release
520 : * the existing locks before entering a busy loop.
521 : * This is required to avoid any undetected deadlocks
522 : * due to any existing lock as deadlock detector won't
523 : * be able to detect the waits on the latch.
524 : *
525 : * Also close any tables prior to the commit.
526 : */
527 197 : if (rel)
528 : {
529 30 : table_close(rel, NoLock);
530 30 : rel = NULL;
531 : }
532 197 : CommitTransactionCommand();
533 197 : pgstat_report_stat(false);
534 : }
535 :
536 : /*
537 : * Enter busy loop and wait for synchronization worker to
538 : * reach expected state (or die trying).
539 : */
540 197 : StartTransactionCommand();
541 197 : started_tx = true;
542 :
543 197 : wait_for_table_state_change(rstate->relid,
544 : SUBREL_STATE_SYNCDONE);
545 : }
546 : else
547 572 : LWLockRelease(LogicalRepWorkerLock);
548 : }
549 : else
550 : {
551 : /*
552 : * If there is no sync worker for this table yet, count
553 : * running sync workers for this subscription, while we have
554 : * the lock.
555 : */
556 : int nsyncworkers =
557 880 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
558 : struct tablesync_start_time_mapping *hentry;
559 : bool found;
560 :
561 : /* Now safe to release the LWLock */
562 880 : LWLockRelease(LogicalRepWorkerLock);
563 :
564 880 : hentry = hash_search(last_start_times, &rstate->relid,
565 : HASH_ENTER, &found);
566 880 : if (!found)
567 205 : hentry->last_start_time = 0;
568 :
569 880 : launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers,
570 : rstate->relid, &hentry->last_start_time);
571 : }
572 : }
573 : }
574 :
575 : /* Close table if opened */
576 8387 : if (rel)
577 166 : table_close(rel, NoLock);
578 :
579 :
580 8387 : if (started_tx)
581 : {
582 : /*
583 : * Even when the two_phase mode is requested by the user, it remains
584 : * as 'pending' until all tablesyncs have reached READY state.
585 : *
586 : * When this happens, we restart the apply worker and (if the
587 : * conditions are still ok) then the two_phase tri-state will become
588 : * 'enabled' at that time.
589 : *
590 : * Note: If the subscription has no tables then leave the state as
591 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
592 : * work.
593 : */
594 1150 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
595 : {
596 39 : CommandCounterIncrement(); /* make updates visible */
597 39 : if (AllTablesyncsReady())
598 : {
599 6 : ereport(LOG,
600 : (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
601 : MySubscription->name)));
602 6 : should_exit = true;
603 : }
604 : }
605 :
606 1150 : CommitTransactionCommand();
607 1150 : pgstat_report_stat(true);
608 : }
609 :
610 8387 : if (should_exit)
611 : {
612 : /*
613 : * Reset the last-start time for this worker so that the launcher will
614 : * restart it without waiting for wal_retrieve_retry_interval.
615 : */
616 6 : ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
617 :
618 6 : proc_exit(0);
619 : }
620 8381 : }
621 :
622 : /*
623 : * Create list of columns for COPY based on logical relation mapping.
624 : */
625 : static List *
626 211 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
627 : {
628 211 : List *attnamelist = NIL;
629 : int i;
630 :
631 557 : for (i = 0; i < rel->remoterel.natts; i++)
632 : {
633 346 : attnamelist = lappend(attnamelist,
634 346 : makeString(rel->remoterel.attnames[i]));
635 : }
636 :
637 :
638 211 : return attnamelist;
639 : }
640 :
641 : /*
642 : * Data source callback for the COPY FROM, which reads from the remote
643 : * connection and passes the data back to our local COPY.
644 : */
645 : static int
646 12067 : copy_read_data(void *outbuf, int minread, int maxread)
647 : {
648 12067 : int bytesread = 0;
649 : int avail;
650 :
651 : /* If there are some leftover data from previous read, use it. */
652 12067 : avail = copybuf->len - copybuf->cursor;
653 12067 : if (avail)
654 : {
655 0 : if (avail > maxread)
656 0 : avail = maxread;
657 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
658 0 : copybuf->cursor += avail;
659 0 : maxread -= avail;
660 0 : bytesread += avail;
661 : }
662 :
663 12070 : while (maxread > 0 && bytesread < minread)
664 : {
665 12070 : pgsocket fd = PGINVALID_SOCKET;
666 : int len;
667 12070 : char *buf = NULL;
668 :
669 : for (;;)
670 : {
671 : /* Try read the data. */
672 12070 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
673 :
674 12070 : CHECK_FOR_INTERRUPTS();
675 :
676 12070 : if (len == 0)
677 3 : break;
678 12067 : else if (len < 0)
679 12067 : return bytesread;
680 : else
681 : {
682 : /* Process the data */
683 11858 : copybuf->data = buf;
684 11858 : copybuf->len = len;
685 11858 : copybuf->cursor = 0;
686 :
687 11858 : avail = copybuf->len - copybuf->cursor;
688 11858 : if (avail > maxread)
689 0 : avail = maxread;
690 11858 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
691 11858 : outbuf = (char *) outbuf + avail;
692 11858 : copybuf->cursor += avail;
693 11858 : maxread -= avail;
694 11858 : bytesread += avail;
695 : }
696 :
697 11858 : if (maxread <= 0 || bytesread >= minread)
698 11858 : return bytesread;
699 : }
700 :
701 : /*
702 : * Wait for more data or latch.
703 : */
704 3 : (void) WaitLatchOrSocket(MyLatch,
705 : WL_SOCKET_READABLE | WL_LATCH_SET |
706 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
707 : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
708 :
709 3 : ResetLatch(MyLatch);
710 : }
711 :
712 0 : return bytesread;
713 : }
714 :
715 :
716 : /*
717 : * Get information about remote relation in similar fashion the RELATION
718 : * message provides during replication.
719 : *
720 : * This function also returns (a) the relation qualifications to be used in
721 : * the COPY command, and (b) whether the remote relation has published any
722 : * generated column.
723 : */
724 : static void
725 214 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
726 : List **qual, bool *gencol_published)
727 : {
728 : WalRcvExecResult *res;
729 : StringInfoData cmd;
730 : TupleTableSlot *slot;
731 214 : Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
732 214 : Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
733 214 : Oid qualRow[] = {TEXTOID};
734 : bool isnull;
735 : int natt;
736 214 : StringInfo pub_names = NULL;
737 214 : Bitmapset *included_cols = NULL;
738 214 : int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
739 :
740 214 : lrel->nspname = nspname;
741 214 : lrel->relname = relname;
742 :
743 : /* First fetch Oid and replica identity. */
744 214 : initStringInfo(&cmd);
745 214 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
746 : " FROM pg_catalog.pg_class c"
747 : " INNER JOIN pg_catalog.pg_namespace n"
748 : " ON (c.relnamespace = n.oid)"
749 : " WHERE n.nspname = %s"
750 : " AND c.relname = %s",
751 : quote_literal_cstr(nspname),
752 : quote_literal_cstr(relname));
753 214 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
754 : lengthof(tableRow), tableRow);
755 :
756 214 : if (res->status != WALRCV_OK_TUPLES)
757 0 : ereport(ERROR,
758 : (errcode(ERRCODE_CONNECTION_FAILURE),
759 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
760 : nspname, relname, res->err)));
761 :
762 214 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
763 214 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
764 0 : ereport(ERROR,
765 : (errcode(ERRCODE_UNDEFINED_OBJECT),
766 : errmsg("table \"%s.%s\" not found on publisher",
767 : nspname, relname)));
768 :
769 214 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
770 : Assert(!isnull);
771 214 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
772 : Assert(!isnull);
773 214 : lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
774 : Assert(!isnull);
775 :
776 214 : ExecDropSingleTupleTableSlot(slot);
777 214 : walrcv_clear_result(res);
778 :
779 :
780 : /*
781 : * Get column lists for each relation.
782 : *
783 : * We need to do this before fetching info about column names and types,
784 : * so that we can skip columns that should not be replicated.
785 : */
786 214 : if (server_version >= 150000)
787 : {
788 : WalRcvExecResult *pubres;
789 : TupleTableSlot *tslot;
790 214 : Oid attrsRow[] = {INT2VECTOROID};
791 :
792 : /* Build the pub_names comma-separated string. */
793 214 : pub_names = makeStringInfo();
794 214 : GetPublicationsStr(MySubscription->publications, pub_names, true);
795 :
796 : /*
797 : * Fetch info about column lists for the relation (from all the
798 : * publications).
799 : */
800 214 : resetStringInfo(&cmd);
801 :
802 214 : if (server_version >= 190000)
803 : {
804 : /*
805 : * We can pass both publication names and relid to
806 : * pg_get_publication_tables() since version 19.
807 : */
808 214 : appendStringInfo(&cmd,
809 : "SELECT DISTINCT"
810 : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
811 : " THEN NULL ELSE gpt.attrs END)"
812 : " FROM pg_get_publication_tables(ARRAY[%s], %u) gpt,"
813 : " pg_class c"
814 : " WHERE c.oid = gpt.relid",
815 : pub_names->data,
816 : lrel->remoteid);
817 : }
818 : else
819 0 : appendStringInfo(&cmd,
820 : "SELECT DISTINCT"
821 : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
822 : " THEN NULL ELSE gpt.attrs END)"
823 : " FROM pg_publication p,"
824 : " LATERAL pg_get_publication_tables(p.pubname) gpt,"
825 : " pg_class c"
826 : " WHERE gpt.relid = %u AND c.oid = gpt.relid"
827 : " AND p.pubname IN ( %s )",
828 : lrel->remoteid,
829 : pub_names->data);
830 :
831 214 : pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
832 : lengthof(attrsRow), attrsRow);
833 :
834 213 : if (pubres->status != WALRCV_OK_TUPLES)
835 0 : ereport(ERROR,
836 : (errcode(ERRCODE_CONNECTION_FAILURE),
837 : errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
838 : nspname, relname, pubres->err)));
839 :
840 : /*
841 : * We don't support the case where the column list is different for
842 : * the same table when combining publications. See comments atop
843 : * fetch_relation_list. So there should be only one row returned.
844 : * Although we already checked this when creating the subscription, we
845 : * still need to check here in case the column list was changed after
846 : * creating the subscription and before the sync worker is started.
847 : */
848 213 : if (tuplestore_tuple_count(pubres->tuplestore) > 1)
849 0 : ereport(ERROR,
850 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
851 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
852 : nspname, relname));
853 :
854 : /*
855 : * Get the column list and build a single bitmap with the attnums.
856 : *
857 : * If we find a NULL value, it means all the columns should be
858 : * replicated.
859 : */
860 213 : tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
861 213 : if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
862 : {
863 213 : Datum cfval = slot_getattr(tslot, 1, &isnull);
864 :
865 213 : if (!isnull)
866 : {
867 : ArrayType *arr;
868 : int nelems;
869 : int16 *elems;
870 :
871 22 : arr = DatumGetArrayTypeP(cfval);
872 22 : nelems = ARR_DIMS(arr)[0];
873 22 : elems = (int16 *) ARR_DATA_PTR(arr);
874 :
875 59 : for (natt = 0; natt < nelems; natt++)
876 37 : included_cols = bms_add_member(included_cols, elems[natt]);
877 : }
878 :
879 213 : ExecClearTuple(tslot);
880 : }
881 213 : ExecDropSingleTupleTableSlot(tslot);
882 :
883 213 : walrcv_clear_result(pubres);
884 : }
885 :
886 : /*
887 : * Now fetch column names and types.
888 : */
889 213 : resetStringInfo(&cmd);
890 213 : appendStringInfoString(&cmd,
891 : "SELECT a.attnum,"
892 : " a.attname,"
893 : " a.atttypid,"
894 : " a.attnum = ANY(i.indkey)");
895 :
896 : /* Generated columns can be replicated since version 18. */
897 213 : if (server_version >= 180000)
898 213 : appendStringInfoString(&cmd, ", a.attgenerated != ''");
899 :
900 426 : appendStringInfo(&cmd,
901 : " FROM pg_catalog.pg_attribute a"
902 : " LEFT JOIN pg_catalog.pg_index i"
903 : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
904 : " WHERE a.attnum > 0::pg_catalog.int2"
905 : " AND NOT a.attisdropped %s"
906 : " AND a.attrelid = %u"
907 : " ORDER BY a.attnum",
908 : lrel->remoteid,
909 213 : (server_version >= 120000 && server_version < 180000 ?
910 : "AND a.attgenerated = ''" : ""),
911 : lrel->remoteid);
912 213 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
913 : server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
914 :
915 213 : if (res->status != WALRCV_OK_TUPLES)
916 0 : ereport(ERROR,
917 : (errcode(ERRCODE_CONNECTION_FAILURE),
918 : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
919 : nspname, relname, res->err)));
920 :
921 : /* We don't know the number of rows coming, so allocate enough space. */
922 213 : lrel->attnames = palloc0_array(char *, MaxTupleAttributeNumber);
923 213 : lrel->atttyps = palloc0_array(Oid, MaxTupleAttributeNumber);
924 213 : lrel->attkeys = NULL;
925 :
926 : /*
927 : * Store the columns as a list of names. Ignore those that are not
928 : * present in the column list, if there is one.
929 : */
930 213 : natt = 0;
931 213 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
932 596 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
933 : {
934 : char *rel_colname;
935 : AttrNumber attnum;
936 :
937 383 : attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
938 : Assert(!isnull);
939 :
940 : /* If the column is not in the column list, skip it. */
941 383 : if (included_cols != NULL && !bms_is_member(attnum, included_cols))
942 : {
943 31 : ExecClearTuple(slot);
944 31 : continue;
945 : }
946 :
947 352 : rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
948 : Assert(!isnull);
949 :
950 352 : lrel->attnames[natt] = rel_colname;
951 352 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
952 : Assert(!isnull);
953 :
954 352 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
955 112 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
956 :
957 : /* Remember if the remote table has published any generated column. */
958 352 : if (server_version >= 180000 && !(*gencol_published))
959 : {
960 352 : *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
961 : Assert(!isnull);
962 : }
963 :
964 : /* Should never happen. */
965 352 : if (++natt >= MaxTupleAttributeNumber)
966 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
967 : nspname, relname);
968 :
969 352 : ExecClearTuple(slot);
970 : }
971 213 : ExecDropSingleTupleTableSlot(slot);
972 :
973 213 : lrel->natts = natt;
974 :
975 213 : walrcv_clear_result(res);
976 :
977 : /*
978 : * Get relation's row filter expressions. DISTINCT avoids the same
979 : * expression of a table in multiple publications from being included
980 : * multiple times in the final expression.
981 : *
982 : * We need to copy the row even if it matches just one of the
983 : * publications, so we later combine all the quals with OR.
984 : *
985 : * For initial synchronization, row filtering can be ignored in following
986 : * cases:
987 : *
988 : * 1) one of the subscribed publications for the table hasn't specified
989 : * any row filter
990 : *
991 : * 2) one of the subscribed publications has puballtables set to true
992 : *
993 : * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
994 : * that includes this relation
995 : */
996 213 : if (server_version >= 150000)
997 : {
998 : /* Reuse the already-built pub_names. */
999 : Assert(pub_names != NULL);
1000 :
1001 : /* Check for row filters. */
1002 213 : resetStringInfo(&cmd);
1003 :
1004 213 : if (server_version >= 190000)
1005 : {
1006 : /*
1007 : * We can pass both publication names and relid to
1008 : * pg_get_publication_tables() since version 19.
1009 : */
1010 213 : appendStringInfo(&cmd,
1011 : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1012 : " FROM pg_get_publication_tables(ARRAY[%s], %u) gpt",
1013 : pub_names->data,
1014 : lrel->remoteid);
1015 : }
1016 : else
1017 0 : appendStringInfo(&cmd,
1018 : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1019 : " FROM pg_publication p,"
1020 : " LATERAL pg_get_publication_tables(p.pubname) gpt"
1021 : " WHERE gpt.relid = %u"
1022 : " AND p.pubname IN ( %s )",
1023 : lrel->remoteid,
1024 : pub_names->data);
1025 :
1026 213 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1027 :
1028 213 : if (res->status != WALRCV_OK_TUPLES)
1029 0 : ereport(ERROR,
1030 : (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1031 : nspname, relname, res->err)));
1032 :
1033 : /*
1034 : * Multiple row filter expressions for the same table will be combined
1035 : * by COPY using OR. If any of the filter expressions for this table
1036 : * are null, it means the whole table will be copied. In this case it
1037 : * is not necessary to construct a unified row filter expression at
1038 : * all.
1039 : */
1040 213 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1041 226 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1042 : {
1043 215 : Datum rf = slot_getattr(slot, 1, &isnull);
1044 :
1045 215 : if (!isnull)
1046 13 : *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1047 : else
1048 : {
1049 : /* Ignore filters and cleanup as necessary. */
1050 202 : if (*qual)
1051 : {
1052 1 : list_free_deep(*qual);
1053 1 : *qual = NIL;
1054 : }
1055 202 : break;
1056 : }
1057 :
1058 13 : ExecClearTuple(slot);
1059 : }
1060 213 : ExecDropSingleTupleTableSlot(slot);
1061 :
1062 213 : walrcv_clear_result(res);
1063 213 : destroyStringInfo(pub_names);
1064 : }
1065 :
1066 213 : pfree(cmd.data);
1067 213 : }
1068 :
1069 : /*
1070 : * Copy existing data of a table from publisher.
1071 : *
1072 : * Caller is responsible for locking the local relation.
1073 : */
1074 : static void
1075 214 : copy_table(Relation rel)
1076 : {
1077 : LogicalRepRelMapEntry *relmapentry;
1078 : LogicalRepRelation lrel;
1079 214 : List *qual = NIL;
1080 : WalRcvExecResult *res;
1081 : StringInfoData cmd;
1082 : CopyFromState cstate;
1083 : List *attnamelist;
1084 : ParseState *pstate;
1085 214 : List *options = NIL;
1086 214 : bool gencol_published = false;
1087 :
1088 : /* Get the publisher relation info. */
1089 214 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
1090 214 : RelationGetRelationName(rel), &lrel, &qual,
1091 : &gencol_published);
1092 :
1093 : /* Put the relation into relmap. */
1094 213 : logicalrep_relmap_update(&lrel);
1095 :
1096 : /* Map the publisher relation to local one. */
1097 213 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1098 : Assert(rel == relmapentry->localrel);
1099 :
1100 : /* Start copy on the publisher. */
1101 211 : initStringInfo(&cmd);
1102 :
1103 : /* Regular or partitioned table with no row filter or generated columns */
1104 211 : if ((lrel.relkind == RELKIND_RELATION || lrel.relkind == RELKIND_PARTITIONED_TABLE)
1105 211 : && qual == NIL && !gencol_published)
1106 : {
1107 197 : appendStringInfo(&cmd, "COPY %s",
1108 197 : quote_qualified_identifier(lrel.nspname, lrel.relname));
1109 :
1110 : /* If the table has columns, then specify the columns */
1111 197 : if (lrel.natts)
1112 : {
1113 196 : appendStringInfoString(&cmd, " (");
1114 :
1115 : /*
1116 : * XXX Do we need to list the columns in all cases? Maybe we're
1117 : * replicating all columns?
1118 : */
1119 521 : for (int i = 0; i < lrel.natts; i++)
1120 : {
1121 325 : if (i > 0)
1122 129 : appendStringInfoString(&cmd, ", ");
1123 :
1124 325 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1125 : }
1126 :
1127 196 : appendStringInfoChar(&cmd, ')');
1128 : }
1129 :
1130 197 : appendStringInfoString(&cmd, " TO STDOUT");
1131 : }
1132 : else
1133 : {
1134 : /*
1135 : * For non-tables and tables with row filters, we need to do COPY
1136 : * (SELECT ...), but we can't just do SELECT * because we may need to
1137 : * copy only subset of columns including generated columns. For tables
1138 : * with any row filters, build a SELECT query with OR'ed row filters
1139 : * for COPY.
1140 : *
1141 : * We also need to use this same COPY (SELECT ...) syntax when
1142 : * generated columns are published, because copy of generated columns
1143 : * is not supported by the normal COPY.
1144 : */
1145 14 : appendStringInfoString(&cmd, "COPY (SELECT ");
1146 35 : for (int i = 0; i < lrel.natts; i++)
1147 : {
1148 21 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1149 21 : if (i < lrel.natts - 1)
1150 7 : appendStringInfoString(&cmd, ", ");
1151 : }
1152 :
1153 14 : appendStringInfoString(&cmd, " FROM ");
1154 :
1155 : /*
1156 : * For regular tables, make sure we don't copy data from a child that
1157 : * inherits the named table as those will be copied separately.
1158 : */
1159 14 : if (lrel.relkind == RELKIND_RELATION)
1160 11 : appendStringInfoString(&cmd, "ONLY ");
1161 :
1162 14 : appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1163 : /* list of OR'ed filters */
1164 14 : if (qual != NIL)
1165 : {
1166 : ListCell *lc;
1167 11 : char *q = strVal(linitial(qual));
1168 :
1169 11 : appendStringInfo(&cmd, " WHERE %s", q);
1170 12 : for_each_from(lc, qual, 1)
1171 : {
1172 1 : q = strVal(lfirst(lc));
1173 1 : appendStringInfo(&cmd, " OR %s", q);
1174 : }
1175 11 : list_free_deep(qual);
1176 : }
1177 :
1178 14 : appendStringInfoString(&cmd, ") TO STDOUT");
1179 : }
1180 :
1181 : /*
1182 : * Prior to v16, initial table synchronization will use text format even
1183 : * if the binary option is enabled for a subscription.
1184 : */
1185 211 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1186 211 : MySubscription->binary)
1187 : {
1188 5 : appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1189 5 : options = list_make1(makeDefElem("format",
1190 : (Node *) makeString("binary"), -1));
1191 : }
1192 :
1193 211 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1194 211 : pfree(cmd.data);
1195 211 : if (res->status != WALRCV_OK_COPY_OUT)
1196 0 : ereport(ERROR,
1197 : (errcode(ERRCODE_CONNECTION_FAILURE),
1198 : errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1199 : lrel.nspname, lrel.relname, res->err)));
1200 211 : walrcv_clear_result(res);
1201 :
1202 211 : copybuf = makeStringInfo();
1203 :
1204 211 : pstate = make_parsestate(NULL);
1205 211 : (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1206 : NULL, false, false);
1207 :
1208 211 : attnamelist = make_copy_attnamelist(relmapentry);
1209 211 : cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1210 :
1211 : /* Do the copy */
1212 210 : (void) CopyFrom(cstate);
1213 :
1214 199 : logicalrep_rel_close(relmapentry, NoLock);
1215 199 : }
1216 :
1217 : /*
1218 : * Determine the tablesync slot name.
1219 : *
1220 : * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1221 : * on slot name length. We append system_identifier to avoid slot_name
1222 : * collision with subscriptions in other clusters. With the current scheme
1223 : * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1224 : * length of slot_name will be 50.
1225 : *
1226 : * The returned slot name is stored in the supplied buffer (syncslotname) with
1227 : * the given size.
1228 : *
1229 : * Note: We don't use the subscription slot name as part of tablesync slot name
1230 : * because we are responsible for cleaning up these slots and it could become
1231 : * impossible to recalculate what name to cleanup if the subscription slot name
1232 : * had changed.
1233 : */
1234 : void
1235 417 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1236 : char *syncslotname, Size szslot)
1237 : {
1238 417 : snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1239 : relid, GetSystemIdentifier());
1240 417 : }
1241 :
1242 : /*
1243 : * Start syncing the table in the sync worker.
1244 : *
1245 : * If nothing needs to be done to sync the table, we exit the worker without
1246 : * any further action.
1247 : *
1248 : * The returned slot name is palloc'ed in current memory context.
1249 : */
1250 : static char *
1251 214 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1252 : {
1253 : char *slotname;
1254 : char *err;
1255 : char relstate;
1256 : XLogRecPtr relstate_lsn;
1257 : Relation rel;
1258 : AclResult aclresult;
1259 : WalRcvExecResult *res;
1260 : char originname[NAMEDATALEN];
1261 : ReplOriginId originid;
1262 : UserContext ucxt;
1263 : bool must_use_password;
1264 : bool run_as_owner;
1265 :
1266 : /* Check the state of the table synchronization. */
1267 214 : StartTransactionCommand();
1268 214 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1269 214 : MyLogicalRepWorker->relid,
1270 : &relstate_lsn);
1271 214 : CommitTransactionCommand();
1272 :
1273 : /* Is the use of a password mandatory? */
1274 423 : must_use_password = MySubscription->passwordrequired &&
1275 209 : !MySubscription->ownersuperuser;
1276 :
1277 214 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1278 214 : MyLogicalRepWorker->relstate = relstate;
1279 214 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1280 214 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1281 :
1282 : /*
1283 : * If synchronization is already done or no longer necessary, exit now
1284 : * that we've updated shared memory state.
1285 : */
1286 214 : switch (relstate)
1287 : {
1288 0 : case SUBREL_STATE_SYNCDONE:
1289 : case SUBREL_STATE_READY:
1290 : case SUBREL_STATE_UNKNOWN:
1291 0 : FinishSyncWorker(); /* doesn't return */
1292 : }
1293 :
1294 : /* Calculate the name of the tablesync slot. */
1295 214 : slotname = (char *) palloc(NAMEDATALEN);
1296 214 : ReplicationSlotNameForTablesync(MySubscription->oid,
1297 214 : MyLogicalRepWorker->relid,
1298 : slotname,
1299 : NAMEDATALEN);
1300 :
1301 : /*
1302 : * Here we use the slot name instead of the subscription name as the
1303 : * application_name, so that it is different from the leader apply worker,
1304 : * so that synchronous replication can distinguish them.
1305 : */
1306 214 : LogRepWorkerWalRcvConn =
1307 214 : walrcv_connect(MySubscription->conninfo, true, true,
1308 : must_use_password,
1309 : slotname, &err);
1310 214 : if (LogRepWorkerWalRcvConn == NULL)
1311 0 : ereport(ERROR,
1312 : (errcode(ERRCODE_CONNECTION_FAILURE),
1313 : errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1314 : MySubscription->name, err)));
1315 :
1316 : Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1317 : MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1318 : MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1319 :
1320 : /* Assign the origin tracking record name. */
1321 214 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1322 214 : MyLogicalRepWorker->relid,
1323 : originname,
1324 : sizeof(originname));
1325 :
1326 214 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1327 : {
1328 : /*
1329 : * We have previously errored out before finishing the copy so the
1330 : * replication slot might exist. We want to remove the slot if it
1331 : * already exists and proceed.
1332 : *
1333 : * XXX We could also instead try to drop the slot, last time we failed
1334 : * but for that, we might need to clean up the copy state as it might
1335 : * be in the middle of fetching the rows. Also, if there is a network
1336 : * breakdown then it wouldn't have succeeded so trying it next time
1337 : * seems like a better bet.
1338 : */
1339 10 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1340 : }
1341 204 : else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1342 : {
1343 : /*
1344 : * The COPY phase was previously done, but tablesync then crashed
1345 : * before it was able to finish normally.
1346 : */
1347 0 : StartTransactionCommand();
1348 :
1349 : /*
1350 : * The origin tracking name must already exist. It was created first
1351 : * time this tablesync was launched.
1352 : */
1353 0 : originid = replorigin_by_name(originname, false);
1354 0 : replorigin_session_setup(originid, 0);
1355 0 : replorigin_xact_state.origin = originid;
1356 0 : *origin_startpos = replorigin_session_get_progress(false);
1357 :
1358 0 : CommitTransactionCommand();
1359 :
1360 0 : goto copy_table_done;
1361 : }
1362 :
1363 214 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1364 214 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1365 214 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1366 214 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1367 :
1368 : /*
1369 : * Update the state, create the replication origin, and make them visible
1370 : * to others.
1371 : */
1372 214 : StartTransactionCommand();
1373 214 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1374 214 : MyLogicalRepWorker->relid,
1375 214 : MyLogicalRepWorker->relstate,
1376 214 : MyLogicalRepWorker->relstate_lsn,
1377 : false);
1378 :
1379 : /*
1380 : * Create the replication origin in a separate transaction from the one
1381 : * that sets up the origin in shared memory. This prevents the risk that
1382 : * changes to the origin in shared memory cannot be rolled back if the
1383 : * transaction aborts.
1384 : */
1385 214 : originid = replorigin_by_name(originname, true);
1386 214 : if (!OidIsValid(originid))
1387 204 : originid = replorigin_create(originname);
1388 :
1389 214 : CommitTransactionCommand();
1390 214 : pgstat_report_stat(true);
1391 :
1392 214 : StartTransactionCommand();
1393 :
1394 : /*
1395 : * Use a standard write lock here. It might be better to disallow access
1396 : * to the table while it's being synchronized. But we don't want to block
1397 : * the main apply process from working and it has to open the relation in
1398 : * RowExclusiveLock when remapping remote relation id to local one.
1399 : */
1400 214 : rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
1401 :
1402 : /*
1403 : * Start a transaction in the remote node in REPEATABLE READ mode. This
1404 : * ensures that both the replication slot we create (see below) and the
1405 : * COPY are consistent with each other.
1406 : */
1407 214 : res = walrcv_exec(LogRepWorkerWalRcvConn,
1408 : "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1409 : 0, NULL);
1410 214 : if (res->status != WALRCV_OK_COMMAND)
1411 0 : ereport(ERROR,
1412 : (errcode(ERRCODE_CONNECTION_FAILURE),
1413 : errmsg("table copy could not start transaction on publisher: %s",
1414 : res->err)));
1415 214 : walrcv_clear_result(res);
1416 :
1417 : /*
1418 : * Create a new permanent logical decoding slot. This slot will be used
1419 : * for the catchup phase after COPY is done, so tell it to use the
1420 : * snapshot to make the final data consistent.
1421 : */
1422 214 : walrcv_create_slot(LogRepWorkerWalRcvConn,
1423 : slotname, false /* permanent */ , false /* two_phase */ ,
1424 : MySubscription->failover,
1425 : CRS_USE_SNAPSHOT, origin_startpos);
1426 :
1427 : /*
1428 : * Advance the origin to the LSN got from walrcv_create_slot and then set
1429 : * up the origin. The advancement is WAL logged for the purpose of
1430 : * recovery. Locks are to prevent the replication origin from vanishing
1431 : * while advancing.
1432 : *
1433 : * The purpose of doing these before the copy is to avoid doing the copy
1434 : * again due to any error in advancing or setting up origin tracking.
1435 : */
1436 214 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1437 214 : replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1438 : true /* go backward */ , true /* WAL log */ );
1439 214 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1440 :
1441 214 : replorigin_session_setup(originid, 0);
1442 214 : replorigin_xact_state.origin = originid;
1443 :
1444 : /*
1445 : * If the user did not opt to run as the owner of the subscription
1446 : * ('run_as_owner'), then copy the table as the owner of the table.
1447 : */
1448 214 : run_as_owner = MySubscription->runasowner;
1449 214 : if (!run_as_owner)
1450 213 : SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1451 :
1452 : /*
1453 : * Check that our table sync worker has permission to insert into the
1454 : * target table.
1455 : */
1456 214 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1457 : ACL_INSERT);
1458 214 : if (aclresult != ACLCHECK_OK)
1459 0 : aclcheck_error(aclresult,
1460 0 : get_relkind_objtype(rel->rd_rel->relkind),
1461 0 : RelationGetRelationName(rel));
1462 :
1463 : /*
1464 : * COPY FROM does not honor RLS policies. That is not a problem for
1465 : * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1466 : * who has it implicitly), but other roles should not be able to
1467 : * circumvent RLS. Disallow logical replication into RLS enabled
1468 : * relations for such roles.
1469 : */
1470 214 : if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
1471 0 : ereport(ERROR,
1472 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1473 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1474 : GetUserNameFromId(GetUserId(), true),
1475 : RelationGetRelationName(rel))));
1476 :
1477 : /* Now do the initial data copy */
1478 214 : PushActiveSnapshot(GetTransactionSnapshot());
1479 214 : copy_table(rel);
1480 199 : PopActiveSnapshot();
1481 :
1482 199 : res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1483 199 : if (res->status != WALRCV_OK_COMMAND)
1484 0 : ereport(ERROR,
1485 : (errcode(ERRCODE_CONNECTION_FAILURE),
1486 : errmsg("table copy could not finish transaction on publisher: %s",
1487 : res->err)));
1488 199 : walrcv_clear_result(res);
1489 :
1490 199 : if (!run_as_owner)
1491 198 : RestoreUserContext(&ucxt);
1492 :
1493 199 : table_close(rel, NoLock);
1494 :
1495 : /* Make the copy visible. */
1496 199 : CommandCounterIncrement();
1497 :
1498 : /*
1499 : * Update the persisted state to indicate the COPY phase is done; make it
1500 : * visible to others.
1501 : */
1502 199 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1503 199 : MyLogicalRepWorker->relid,
1504 : SUBREL_STATE_FINISHEDCOPY,
1505 199 : MyLogicalRepWorker->relstate_lsn,
1506 : false);
1507 :
1508 199 : CommitTransactionCommand();
1509 :
1510 199 : copy_table_done:
1511 :
1512 199 : elog(DEBUG1,
1513 : "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1514 : originname, LSN_FORMAT_ARGS(*origin_startpos));
1515 :
1516 : /*
1517 : * We are done with the initial data synchronization, update the state.
1518 : */
1519 199 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1520 199 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1521 199 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1522 199 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1523 :
1524 : /*
1525 : * Finally, wait until the leader apply worker tells us to catch up and
1526 : * then return to let LogicalRepApplyLoop do it.
1527 : */
1528 199 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1529 199 : return slotname;
1530 : }
1531 :
1532 : /*
1533 : * Execute the initial sync with error handling. Disable the subscription,
1534 : * if it's required.
1535 : *
1536 : * Allocate the slot name in long-lived context on return. Note that we don't
1537 : * handle FATAL errors which are probably because of system resource error and
1538 : * are not repeatable.
1539 : */
1540 : static void
1541 214 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1542 : {
1543 214 : char *sync_slotname = NULL;
1544 :
1545 : Assert(am_tablesync_worker());
1546 :
1547 214 : PG_TRY();
1548 : {
1549 : /* Call initial sync. */
1550 214 : sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1551 : }
1552 14 : PG_CATCH();
1553 : {
1554 14 : if (MySubscription->disableonerr)
1555 1 : DisableSubscriptionAndExit();
1556 : else
1557 : {
1558 : /*
1559 : * Report the worker failed during table synchronization. Abort
1560 : * the current transaction so that the stats message is sent in an
1561 : * idle state.
1562 : */
1563 13 : AbortOutOfAnyTransaction();
1564 13 : pgstat_report_subscription_error(MySubscription->oid);
1565 :
1566 13 : PG_RE_THROW();
1567 : }
1568 : }
1569 199 : PG_END_TRY();
1570 :
1571 : /* allocate slot name in long-lived context */
1572 199 : *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1573 199 : pfree(sync_slotname);
1574 199 : }
1575 :
1576 : /*
1577 : * Runs the tablesync worker.
1578 : *
1579 : * It starts syncing tables. After a successful sync, sets streaming options
1580 : * and starts streaming to catchup with apply worker.
1581 : */
1582 : static void
1583 214 : run_tablesync_worker(void)
1584 : {
1585 : char originname[NAMEDATALEN];
1586 214 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1587 214 : char *slotname = NULL;
1588 : WalRcvStreamOptions options;
1589 :
1590 214 : start_table_sync(&origin_startpos, &slotname);
1591 :
1592 199 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1593 199 : MyLogicalRepWorker->relid,
1594 : originname,
1595 : sizeof(originname));
1596 :
1597 199 : set_apply_error_context_origin(originname);
1598 :
1599 199 : set_stream_options(&options, slotname, &origin_startpos);
1600 :
1601 199 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1602 :
1603 : /* Apply the changes till we catchup with the apply worker. */
1604 199 : start_apply(origin_startpos);
1605 0 : }
1606 :
1607 : /* Logical Replication Tablesync worker entry point */
1608 : void
1609 214 : TableSyncWorkerMain(Datum main_arg)
1610 : {
1611 214 : int worker_slot = DatumGetInt32(main_arg);
1612 :
1613 214 : SetupApplyOrSyncWorker(worker_slot);
1614 :
1615 214 : run_tablesync_worker();
1616 :
1617 0 : FinishSyncWorker();
1618 : }
1619 :
1620 : /*
1621 : * If the subscription has no tables then return false.
1622 : *
1623 : * Otherwise, are all tablesyncs READY?
1624 : *
1625 : * Note: This function is not suitable to be called from outside of apply or
1626 : * tablesync workers because MySubscription needs to be already initialized.
1627 : */
1628 : bool
1629 186 : AllTablesyncsReady(void)
1630 : {
1631 : bool started_tx;
1632 : bool has_tables;
1633 :
1634 : /* We need up-to-date sync state info for subscription tables here. */
1635 186 : FetchRelationStates(&has_tables, NULL, &started_tx);
1636 :
1637 186 : if (started_tx)
1638 : {
1639 16 : CommitTransactionCommand();
1640 16 : pgstat_report_stat(true);
1641 : }
1642 :
1643 : /*
1644 : * Return false when there are no tables in subscription or not all tables
1645 : * are in ready state; true otherwise.
1646 : */
1647 186 : return has_tables && (table_states_not_ready == NIL);
1648 : }
1649 :
1650 : /*
1651 : * Return whether the subscription currently has any tables.
1652 : *
1653 : * Note: Unlike HasSubscriptionTables(), this function relies on cached
1654 : * information for subscription tables. Additionally, it should not be
1655 : * invoked outside of apply or tablesync workers, as MySubscription must be
1656 : * initialized first.
1657 : */
1658 : bool
1659 103 : HasSubscriptionTablesCached(void)
1660 : {
1661 : bool started_tx;
1662 : bool has_tables;
1663 :
1664 : /* We need up-to-date subscription tables info here */
1665 103 : FetchRelationStates(&has_tables, NULL, &started_tx);
1666 :
1667 103 : if (started_tx)
1668 : {
1669 0 : CommitTransactionCommand();
1670 0 : pgstat_report_stat(true);
1671 : }
1672 :
1673 103 : return has_tables;
1674 : }
1675 :
1676 : /*
1677 : * Update the two_phase state of the specified subscription in pg_subscription.
1678 : */
1679 : void
1680 10 : UpdateTwoPhaseState(Oid suboid, char new_state)
1681 : {
1682 : Relation rel;
1683 : HeapTuple tup;
1684 : bool nulls[Natts_pg_subscription];
1685 : bool replaces[Natts_pg_subscription];
1686 : Datum values[Natts_pg_subscription];
1687 :
1688 : Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1689 : new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1690 : new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1691 :
1692 10 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1693 10 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1694 10 : if (!HeapTupleIsValid(tup))
1695 0 : elog(ERROR,
1696 : "cache lookup failed for subscription oid %u",
1697 : suboid);
1698 :
1699 : /* Form a new tuple. */
1700 10 : memset(values, 0, sizeof(values));
1701 10 : memset(nulls, false, sizeof(nulls));
1702 10 : memset(replaces, false, sizeof(replaces));
1703 :
1704 : /* And update/set two_phase state */
1705 10 : values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1706 10 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1707 :
1708 10 : tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1709 : values, nulls, replaces);
1710 10 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1711 :
1712 10 : heap_freetuple(tup);
1713 10 : table_close(rel, RowExclusiveLock);
1714 10 : }
|