Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * launcher.c
3 : * PostgreSQL logical replication worker launcher process
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/launcher.c
9 : *
10 : * NOTES
11 : * This module contains the logical replication worker launcher which
12 : * uses the background worker infrastructure to start the logical
13 : * replication workers for every enabled subscription.
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 :
18 : #include "postgres.h"
19 :
20 : #include "access/heapam.h"
21 : #include "access/htup.h"
22 : #include "access/htup_details.h"
23 : #include "access/tableam.h"
24 : #include "access/xact.h"
25 : #include "catalog/pg_subscription.h"
26 : #include "catalog/pg_subscription_rel.h"
27 : #include "funcapi.h"
28 : #include "lib/dshash.h"
29 : #include "miscadmin.h"
30 : #include "pgstat.h"
31 : #include "postmaster/bgworker.h"
32 : #include "postmaster/interrupt.h"
33 : #include "replication/logicallauncher.h"
34 : #include "replication/origin.h"
35 : #include "replication/slot.h"
36 : #include "replication/walreceiver.h"
37 : #include "replication/worker_internal.h"
38 : #include "storage/ipc.h"
39 : #include "storage/proc.h"
40 : #include "storage/procarray.h"
41 : #include "tcop/tcopprot.h"
42 : #include "utils/builtins.h"
43 : #include "utils/memutils.h"
44 : #include "utils/pg_lsn.h"
45 : #include "utils/snapmgr.h"
46 : #include "utils/syscache.h"
47 :
48 : /* max sleep time between cycles (3min) */
49 : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
50 :
51 : /* GUC variables */
52 : int max_logical_replication_workers = 4;
53 : int max_sync_workers_per_subscription = 2;
54 : int max_parallel_apply_workers_per_subscription = 2;
55 :
56 : LogicalRepWorker *MyLogicalRepWorker = NULL;
57 :
58 : typedef struct LogicalRepCtxStruct
59 : {
60 : /* Supervisor process. */
61 : pid_t launcher_pid;
62 :
63 : /* Hash table holding last start times of subscriptions' apply workers. */
64 : dsa_handle last_start_dsa;
65 : dshash_table_handle last_start_dsh;
66 :
67 : /* Background workers. */
68 : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
69 : } LogicalRepCtxStruct;
70 :
71 : static LogicalRepCtxStruct *LogicalRepCtx;
72 :
73 : /* an entry in the last-start-times shared hash table */
74 : typedef struct LauncherLastStartTimesEntry
75 : {
76 : Oid subid; /* OID of logrep subscription (hash key) */
77 : TimestampTz last_start_time; /* last time its apply worker was started */
78 : } LauncherLastStartTimesEntry;
79 :
80 : /* parameters for the last-start-times shared hash table */
81 : static const dshash_parameters dsh_params = {
82 : sizeof(Oid),
83 : sizeof(LauncherLastStartTimesEntry),
84 : dshash_memcmp,
85 : dshash_memhash,
86 : dshash_memcpy,
87 : LWTRANCHE_LAUNCHER_HASH
88 : };
89 :
90 : static dsa_area *last_start_times_dsa = NULL;
91 : static dshash_table *last_start_times = NULL;
92 :
93 : static bool on_commit_launcher_wakeup = false;
94 :
95 :
96 : static void logicalrep_launcher_onexit(int code, Datum arg);
97 : static void logicalrep_worker_onexit(int code, Datum arg);
98 : static void logicalrep_worker_detach(void);
99 : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
100 : static int logicalrep_pa_worker_count(Oid subid);
101 : static void logicalrep_launcher_attach_dshmem(void);
102 : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
103 : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
104 : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
105 : static bool acquire_conflict_slot_if_exists(void);
106 : static void update_conflict_slot_xmin(TransactionId new_xmin);
107 : static void init_conflict_slot_xmin(void);
108 :
109 :
110 : /*
111 : * Load the list of subscriptions.
112 : *
113 : * Only the fields interesting for worker start/stop functions are filled for
114 : * each subscription.
115 : */
116 : static List *
117 5764 : get_subscription_list(void)
118 : {
119 5764 : List *res = NIL;
120 : Relation rel;
121 : TableScanDesc scan;
122 : HeapTuple tup;
123 : MemoryContext resultcxt;
124 :
125 : /* This is the context that we will allocate our output data in */
126 5764 : resultcxt = CurrentMemoryContext;
127 :
128 : /*
129 : * Start a transaction so we can access pg_subscription.
130 : */
131 5764 : StartTransactionCommand();
132 :
133 5764 : rel = table_open(SubscriptionRelationId, AccessShareLock);
134 5764 : scan = table_beginscan_catalog(rel, 0, NULL);
135 :
136 7512 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
137 : {
138 1748 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
139 : Subscription *sub;
140 : MemoryContext oldcxt;
141 :
142 : /*
143 : * Allocate our results in the caller's context, not the
144 : * transaction's. We do this inside the loop, and restore the original
145 : * context at the end, so that leaky things like heap_getnext() are
146 : * not called in a potentially long-lived context.
147 : */
148 1748 : oldcxt = MemoryContextSwitchTo(resultcxt);
149 :
150 1748 : sub = (Subscription *) palloc0(sizeof(Subscription));
151 1748 : sub->oid = subform->oid;
152 1748 : sub->dbid = subform->subdbid;
153 1748 : sub->owner = subform->subowner;
154 1748 : sub->enabled = subform->subenabled;
155 1748 : sub->name = pstrdup(NameStr(subform->subname));
156 1748 : sub->retaindeadtuples = subform->subretaindeadtuples;
157 1748 : sub->retentionactive = subform->subretentionactive;
158 : /* We don't fill fields we are not interested in. */
159 :
160 1748 : res = lappend(res, sub);
161 1748 : MemoryContextSwitchTo(oldcxt);
162 : }
163 :
164 5764 : table_endscan(scan);
165 5764 : table_close(rel, AccessShareLock);
166 :
167 5764 : CommitTransactionCommand();
168 :
169 5764 : return res;
170 : }
171 :
172 : /*
173 : * Wait for a background worker to start up and attach to the shmem context.
174 : *
175 : * This is only needed for cleaning up the shared memory in case the worker
176 : * fails to attach.
177 : *
178 : * Returns whether the attach was successful.
179 : */
180 : static bool
181 838 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
182 : uint16 generation,
183 : BackgroundWorkerHandle *handle)
184 : {
185 838 : bool result = false;
186 838 : bool dropped_latch = false;
187 :
188 : for (;;)
189 3042 : {
190 : BgwHandleStatus status;
191 : pid_t pid;
192 : int rc;
193 :
194 3880 : CHECK_FOR_INTERRUPTS();
195 :
196 3880 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197 :
198 : /* Worker either died or has started. Return false if died. */
199 3880 : if (!worker->in_use || worker->proc)
200 : {
201 838 : result = worker->in_use;
202 838 : LWLockRelease(LogicalRepWorkerLock);
203 838 : break;
204 : }
205 :
206 3042 : LWLockRelease(LogicalRepWorkerLock);
207 :
208 : /* Check if worker has died before attaching, and clean up after it. */
209 3042 : status = GetBackgroundWorkerPid(handle, &pid);
210 :
211 3042 : if (status == BGWH_STOPPED)
212 : {
213 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
214 : /* Ensure that this was indeed the worker we waited for. */
215 0 : if (generation == worker->generation)
216 0 : logicalrep_worker_cleanup(worker);
217 0 : LWLockRelease(LogicalRepWorkerLock);
218 0 : break; /* result is already false */
219 : }
220 :
221 : /*
222 : * We need timeout because we generally don't get notified via latch
223 : * about the worker attach. But we don't expect to have to wait long.
224 : */
225 3042 : rc = WaitLatch(MyLatch,
226 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
227 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
228 :
229 3042 : if (rc & WL_LATCH_SET)
230 : {
231 1010 : ResetLatch(MyLatch);
232 1010 : CHECK_FOR_INTERRUPTS();
233 1010 : dropped_latch = true;
234 : }
235 : }
236 :
237 : /*
238 : * If we had to clear a latch event in order to wait, be sure to restore
239 : * it before exiting. Otherwise caller may miss events.
240 : */
241 838 : if (dropped_latch)
242 838 : SetLatch(MyLatch);
243 :
244 838 : return result;
245 : }
246 :
247 : /*
248 : * Walks the workers array and searches for one that matches given worker type,
249 : * subscription id, and relation id.
250 : *
251 : * For both apply workers and sequencesync workers, the relid should be set to
252 : * InvalidOid, as these workers handle changes across all tables and sequences
253 : * respectively, rather than targeting a specific relation. For tablesync
254 : * workers, the relid should be set to the OID of the relation being
255 : * synchronized.
256 : */
257 : LogicalRepWorker *
258 6034 : logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
259 : bool only_running)
260 : {
261 : int i;
262 6034 : LogicalRepWorker *res = NULL;
263 :
264 : /* relid must be valid only for table sync workers */
265 : Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
266 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
267 :
268 : /* Search for an attached worker that matches the specified criteria. */
269 18208 : for (i = 0; i < max_logical_replication_workers; i++)
270 : {
271 15926 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
272 :
273 : /* Skip parallel apply workers. */
274 15926 : if (isParallelApplyWorker(w))
275 0 : continue;
276 :
277 15926 : if (w->in_use && w->subid == subid && w->relid == relid &&
278 3808 : w->type == wtype && (!only_running || w->proc))
279 : {
280 3752 : res = w;
281 3752 : break;
282 : }
283 : }
284 :
285 6034 : return res;
286 : }
287 :
288 : /*
289 : * Similar to logicalrep_worker_find(), but returns a list of all workers for
290 : * the subscription, instead of just one.
291 : */
292 : List *
293 1310 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
294 : {
295 : int i;
296 1310 : List *res = NIL;
297 :
298 1310 : if (acquire_lock)
299 238 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
300 :
301 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
302 :
303 : /* Search for attached worker for a given subscription id. */
304 6786 : for (i = 0; i < max_logical_replication_workers; i++)
305 : {
306 5476 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
307 :
308 5476 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
309 936 : res = lappend(res, w);
310 : }
311 :
312 1310 : if (acquire_lock)
313 238 : LWLockRelease(LogicalRepWorkerLock);
314 :
315 1310 : return res;
316 : }
317 :
318 : /*
319 : * Start new logical replication background worker, if possible.
320 : *
321 : * Returns true on success, false on failure.
322 : */
323 : bool
324 838 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
325 : Oid dbid, Oid subid, const char *subname, Oid userid,
326 : Oid relid, dsm_handle subworker_dsm,
327 : bool retain_dead_tuples)
328 : {
329 : BackgroundWorker bgw;
330 : BackgroundWorkerHandle *bgw_handle;
331 : uint16 generation;
332 : int i;
333 838 : int slot = 0;
334 838 : LogicalRepWorker *worker = NULL;
335 : int nsyncworkers;
336 : int nparallelapplyworkers;
337 : TimestampTz now;
338 838 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
339 838 : bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
340 838 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
341 :
342 : /*----------
343 : * Sanity checks:
344 : * - must be valid worker type
345 : * - tablesync workers are only ones to have relid
346 : * - parallel apply worker is the only kind of subworker
347 : * - The replication slot used in conflict detection is created when
348 : * retain_dead_tuples is enabled
349 : */
350 : Assert(wtype != WORKERTYPE_UNKNOWN);
351 : Assert(is_tablesync_worker == OidIsValid(relid));
352 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
353 : Assert(!retain_dead_tuples || MyReplicationSlot);
354 :
355 838 : ereport(DEBUG1,
356 : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
357 : subname)));
358 :
359 : /* Report this after the initial starting message for consistency. */
360 838 : if (max_active_replication_origins == 0)
361 0 : ereport(ERROR,
362 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
363 : errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
364 :
365 : /*
366 : * We need to do the modification of the shared memory under lock so that
367 : * we have consistent view.
368 : */
369 838 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
370 :
371 838 : retry:
372 : /* Find unused worker slot. */
373 1466 : for (i = 0; i < max_logical_replication_workers; i++)
374 : {
375 1466 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
376 :
377 1466 : if (!w->in_use)
378 : {
379 838 : worker = w;
380 838 : slot = i;
381 838 : break;
382 : }
383 : }
384 :
385 838 : nsyncworkers = logicalrep_sync_worker_count(subid);
386 :
387 838 : now = GetCurrentTimestamp();
388 :
389 : /*
390 : * If we didn't find a free slot, try to do garbage collection. The
391 : * reason we do this is because if some worker failed to start up and its
392 : * parent has crashed while waiting, the in_use state was never cleared.
393 : */
394 838 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
395 : {
396 0 : bool did_cleanup = false;
397 :
398 0 : for (i = 0; i < max_logical_replication_workers; i++)
399 : {
400 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
401 :
402 : /*
403 : * If the worker was marked in use but didn't manage to attach in
404 : * time, clean it up.
405 : */
406 0 : if (w->in_use && !w->proc &&
407 0 : TimestampDifferenceExceeds(w->launch_time, now,
408 : wal_receiver_timeout))
409 : {
410 0 : elog(WARNING,
411 : "logical replication worker for subscription %u took too long to start; canceled",
412 : w->subid);
413 :
414 0 : logicalrep_worker_cleanup(w);
415 0 : did_cleanup = true;
416 : }
417 : }
418 :
419 0 : if (did_cleanup)
420 0 : goto retry;
421 : }
422 :
423 : /*
424 : * We don't allow to invoke more sync workers once we have reached the
425 : * sync worker limit per subscription. So, just return silently as we
426 : * might get here because of an otherwise harmless race condition.
427 : */
428 838 : if ((is_tablesync_worker || is_sequencesync_worker) &&
429 410 : nsyncworkers >= max_sync_workers_per_subscription)
430 : {
431 0 : LWLockRelease(LogicalRepWorkerLock);
432 0 : return false;
433 : }
434 :
435 838 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
436 :
437 : /*
438 : * Return false if the number of parallel apply workers reached the limit
439 : * per subscription.
440 : */
441 838 : if (is_parallel_apply_worker &&
442 20 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
443 : {
444 0 : LWLockRelease(LogicalRepWorkerLock);
445 0 : return false;
446 : }
447 :
448 : /*
449 : * However if there are no more free worker slots, inform user about it
450 : * before exiting.
451 : */
452 838 : if (worker == NULL)
453 : {
454 0 : LWLockRelease(LogicalRepWorkerLock);
455 0 : ereport(WARNING,
456 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
457 : errmsg("out of logical replication worker slots"),
458 : errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
459 0 : return false;
460 : }
461 :
462 : /* Prepare the worker slot. */
463 838 : worker->type = wtype;
464 838 : worker->launch_time = now;
465 838 : worker->in_use = true;
466 838 : worker->generation++;
467 838 : worker->proc = NULL;
468 838 : worker->dbid = dbid;
469 838 : worker->userid = userid;
470 838 : worker->subid = subid;
471 838 : worker->relid = relid;
472 838 : worker->relstate = SUBREL_STATE_UNKNOWN;
473 838 : worker->relstate_lsn = InvalidXLogRecPtr;
474 838 : worker->stream_fileset = NULL;
475 838 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
476 838 : worker->parallel_apply = is_parallel_apply_worker;
477 838 : worker->oldest_nonremovable_xid = retain_dead_tuples
478 4 : ? MyReplicationSlot->data.xmin
479 838 : : InvalidTransactionId;
480 838 : worker->last_lsn = InvalidXLogRecPtr;
481 838 : TIMESTAMP_NOBEGIN(worker->last_send_time);
482 838 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
483 838 : worker->reply_lsn = InvalidXLogRecPtr;
484 838 : TIMESTAMP_NOBEGIN(worker->reply_time);
485 838 : worker->last_seqsync_start_time = 0;
486 :
487 : /* Before releasing lock, remember generation for future identification. */
488 838 : generation = worker->generation;
489 :
490 838 : LWLockRelease(LogicalRepWorkerLock);
491 :
492 : /* Register the new dynamic worker. */
493 838 : memset(&bgw, 0, sizeof(bgw));
494 838 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
495 : BGWORKER_BACKEND_DATABASE_CONNECTION;
496 838 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
497 838 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
498 :
499 838 : switch (worker->type)
500 : {
501 408 : case WORKERTYPE_APPLY:
502 408 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
503 408 : snprintf(bgw.bgw_name, BGW_MAXLEN,
504 : "logical replication apply worker for subscription %u",
505 : subid);
506 408 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
507 408 : break;
508 :
509 20 : case WORKERTYPE_PARALLEL_APPLY:
510 20 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
511 20 : snprintf(bgw.bgw_name, BGW_MAXLEN,
512 : "logical replication parallel apply worker for subscription %u",
513 : subid);
514 20 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
515 :
516 20 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
517 20 : break;
518 :
519 18 : case WORKERTYPE_SEQUENCESYNC:
520 18 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
521 18 : snprintf(bgw.bgw_name, BGW_MAXLEN,
522 : "logical replication sequencesync worker for subscription %u",
523 : subid);
524 18 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
525 18 : break;
526 :
527 392 : case WORKERTYPE_TABLESYNC:
528 392 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
529 392 : snprintf(bgw.bgw_name, BGW_MAXLEN,
530 : "logical replication tablesync worker for subscription %u sync %u",
531 : subid,
532 : relid);
533 392 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
534 392 : break;
535 :
536 0 : case WORKERTYPE_UNKNOWN:
537 : /* Should never happen. */
538 0 : elog(ERROR, "unknown worker type");
539 : }
540 :
541 838 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
542 838 : bgw.bgw_notify_pid = MyProcPid;
543 838 : bgw.bgw_main_arg = Int32GetDatum(slot);
544 :
545 838 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
546 : {
547 : /* Failed to start worker, so clean up the worker slot. */
548 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
549 : Assert(generation == worker->generation);
550 0 : logicalrep_worker_cleanup(worker);
551 0 : LWLockRelease(LogicalRepWorkerLock);
552 :
553 0 : ereport(WARNING,
554 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
555 : errmsg("out of background worker slots"),
556 : errhint("You might need to increase \"%s\".", "max_worker_processes")));
557 0 : return false;
558 : }
559 :
560 : /* Now wait until it attaches. */
561 838 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
562 : }
563 :
564 : /*
565 : * Internal function to stop the worker and wait until it detaches from the
566 : * slot.
567 : */
568 : static void
569 164 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
570 : {
571 : uint16 generation;
572 :
573 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
574 :
575 : /*
576 : * Remember which generation was our worker so we can check if what we see
577 : * is still the same one.
578 : */
579 164 : generation = worker->generation;
580 :
581 : /*
582 : * If we found a worker but it does not have proc set then it is still
583 : * starting up; wait for it to finish starting and then kill it.
584 : */
585 166 : while (worker->in_use && !worker->proc)
586 : {
587 : int rc;
588 :
589 8 : LWLockRelease(LogicalRepWorkerLock);
590 :
591 : /* Wait a bit --- we don't expect to have to wait long. */
592 8 : rc = WaitLatch(MyLatch,
593 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
594 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
595 :
596 8 : if (rc & WL_LATCH_SET)
597 : {
598 2 : ResetLatch(MyLatch);
599 2 : CHECK_FOR_INTERRUPTS();
600 : }
601 :
602 : /* Recheck worker status. */
603 8 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
604 :
605 : /*
606 : * Check whether the worker slot is no longer used, which would mean
607 : * that the worker has exited, or whether the worker generation is
608 : * different, meaning that a different worker has taken the slot.
609 : */
610 8 : if (!worker->in_use || worker->generation != generation)
611 0 : return;
612 :
613 : /* Worker has assigned proc, so it has started. */
614 8 : if (worker->proc)
615 6 : break;
616 : }
617 :
618 : /* Now terminate the worker ... */
619 164 : kill(worker->proc->pid, signo);
620 :
621 : /* ... and wait for it to die. */
622 : for (;;)
623 242 : {
624 : int rc;
625 :
626 : /* is it gone? */
627 406 : if (!worker->proc || worker->generation != generation)
628 : break;
629 :
630 242 : LWLockRelease(LogicalRepWorkerLock);
631 :
632 : /* Wait a bit --- we don't expect to have to wait long. */
633 242 : rc = WaitLatch(MyLatch,
634 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
635 : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
636 :
637 242 : if (rc & WL_LATCH_SET)
638 : {
639 114 : ResetLatch(MyLatch);
640 114 : CHECK_FOR_INTERRUPTS();
641 : }
642 :
643 242 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
644 : }
645 : }
646 :
647 : /*
648 : * Stop the logical replication worker that matches the specified worker type,
649 : * subscription id, and relation id.
650 : */
651 : void
652 190 : logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
653 : {
654 : LogicalRepWorker *worker;
655 :
656 : /* relid must be valid only for table sync workers */
657 : Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
658 :
659 190 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
660 :
661 190 : worker = logicalrep_worker_find(wtype, subid, relid, false);
662 :
663 190 : if (worker)
664 : {
665 : Assert(!isParallelApplyWorker(worker));
666 148 : logicalrep_worker_stop_internal(worker, SIGTERM);
667 : }
668 :
669 190 : LWLockRelease(LogicalRepWorkerLock);
670 190 : }
671 :
672 : /*
673 : * Stop the given logical replication parallel apply worker.
674 : *
675 : * Node that the function sends SIGUSR2 instead of SIGTERM to the parallel apply
676 : * worker so that the worker exits cleanly.
677 : */
678 : void
679 10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
680 : {
681 : int slot_no;
682 : uint16 generation;
683 : LogicalRepWorker *worker;
684 :
685 10 : SpinLockAcquire(&winfo->shared->mutex);
686 10 : generation = winfo->shared->logicalrep_worker_generation;
687 10 : slot_no = winfo->shared->logicalrep_worker_slot_no;
688 10 : SpinLockRelease(&winfo->shared->mutex);
689 :
690 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
691 :
692 : /*
693 : * Detach from the error_mq_handle for the parallel apply worker before
694 : * stopping it. This prevents the leader apply worker from trying to
695 : * receive the message from the error queue that might already be detached
696 : * by the parallel apply worker.
697 : */
698 10 : if (winfo->error_mq_handle)
699 : {
700 10 : shm_mq_detach(winfo->error_mq_handle);
701 10 : winfo->error_mq_handle = NULL;
702 : }
703 :
704 10 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
705 :
706 10 : worker = &LogicalRepCtx->workers[slot_no];
707 : Assert(isParallelApplyWorker(worker));
708 :
709 : /*
710 : * Only stop the worker if the generation matches and the worker is alive.
711 : */
712 10 : if (worker->generation == generation && worker->proc)
713 10 : logicalrep_worker_stop_internal(worker, SIGUSR2);
714 :
715 10 : LWLockRelease(LogicalRepWorkerLock);
716 10 : }
717 :
718 : /*
719 : * Wake up (using latch) any logical replication worker that matches the
720 : * specified worker type, subscription id, and relation id.
721 : */
722 : void
723 422 : logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
724 : {
725 : LogicalRepWorker *worker;
726 :
727 : /* relid must be valid only for table sync workers */
728 : Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
729 :
730 422 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
731 :
732 422 : worker = logicalrep_worker_find(wtype, subid, relid, true);
733 :
734 422 : if (worker)
735 422 : logicalrep_worker_wakeup_ptr(worker);
736 :
737 422 : LWLockRelease(LogicalRepWorkerLock);
738 422 : }
739 :
740 : /*
741 : * Wake up (using latch) the specified logical replication worker.
742 : *
743 : * Caller must hold lock, else worker->proc could change under us.
744 : */
745 : void
746 1300 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
747 : {
748 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
749 :
750 1300 : SetLatch(&worker->proc->procLatch);
751 1300 : }
752 :
753 : /*
754 : * Attach to a slot.
755 : */
756 : void
757 1068 : logicalrep_worker_attach(int slot)
758 : {
759 : /* Block concurrent access. */
760 1068 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
761 :
762 : Assert(slot >= 0 && slot < max_logical_replication_workers);
763 1068 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
764 :
765 1068 : if (!MyLogicalRepWorker->in_use)
766 : {
767 0 : LWLockRelease(LogicalRepWorkerLock);
768 0 : ereport(ERROR,
769 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
770 : errmsg("logical replication worker slot %d is empty, cannot attach",
771 : slot)));
772 : }
773 :
774 1068 : if (MyLogicalRepWorker->proc)
775 : {
776 0 : LWLockRelease(LogicalRepWorkerLock);
777 0 : ereport(ERROR,
778 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
779 : errmsg("logical replication worker slot %d is already used by "
780 : "another worker, cannot attach", slot)));
781 : }
782 :
783 1068 : MyLogicalRepWorker->proc = MyProc;
784 1068 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
785 :
786 1068 : LWLockRelease(LogicalRepWorkerLock);
787 1068 : }
788 :
789 : /*
790 : * Stop the parallel apply workers if any, and detach the leader apply worker
791 : * (cleans up the worker info).
792 : */
793 : static void
794 1068 : logicalrep_worker_detach(void)
795 : {
796 : /* Stop the parallel apply workers. */
797 1068 : if (am_leader_apply_worker())
798 : {
799 : List *workers;
800 : ListCell *lc;
801 :
802 : /*
803 : * Detach from the error_mq_handle for all parallel apply workers
804 : * before terminating them. This prevents the leader apply worker from
805 : * receiving the worker termination message and sending it to logs
806 : * when the same is already done by the parallel worker.
807 : */
808 634 : pa_detach_all_error_mq();
809 :
810 634 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
811 :
812 634 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
813 1278 : foreach(lc, workers)
814 : {
815 644 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
816 :
817 644 : if (isParallelApplyWorker(w))
818 6 : logicalrep_worker_stop_internal(w, SIGTERM);
819 : }
820 :
821 634 : LWLockRelease(LogicalRepWorkerLock);
822 :
823 634 : list_free(workers);
824 : }
825 :
826 : /* Block concurrent access. */
827 1068 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
828 :
829 1068 : logicalrep_worker_cleanup(MyLogicalRepWorker);
830 :
831 1068 : LWLockRelease(LogicalRepWorkerLock);
832 1068 : }
833 :
834 : /*
835 : * Clean up worker info.
836 : */
837 : static void
838 1068 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
839 : {
840 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
841 :
842 1068 : worker->type = WORKERTYPE_UNKNOWN;
843 1068 : worker->in_use = false;
844 1068 : worker->proc = NULL;
845 1068 : worker->dbid = InvalidOid;
846 1068 : worker->userid = InvalidOid;
847 1068 : worker->subid = InvalidOid;
848 1068 : worker->relid = InvalidOid;
849 1068 : worker->leader_pid = InvalidPid;
850 1068 : worker->parallel_apply = false;
851 1068 : }
852 :
853 : /*
854 : * Cleanup function for logical replication launcher.
855 : *
856 : * Called on logical replication launcher exit.
857 : */
858 : static void
859 860 : logicalrep_launcher_onexit(int code, Datum arg)
860 : {
861 860 : LogicalRepCtx->launcher_pid = 0;
862 860 : }
863 :
864 : /*
865 : * Reset the last_seqsync_start_time of the sequencesync worker in the
866 : * subscription's apply worker.
867 : *
868 : * Note that this value is not stored in the sequencesync worker, because that
869 : * has finished already and is about to exit.
870 : */
871 : void
872 10 : logicalrep_reset_seqsync_start_time(void)
873 : {
874 : LogicalRepWorker *worker;
875 :
876 : /*
877 : * The apply worker can't access last_seqsync_start_time concurrently, so
878 : * it is okay to use SHARED lock here. See ProcessSequencesForSync().
879 : */
880 10 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
881 :
882 10 : worker = logicalrep_worker_find(WORKERTYPE_APPLY,
883 10 : MyLogicalRepWorker->subid, InvalidOid,
884 : true);
885 10 : if (worker)
886 10 : worker->last_seqsync_start_time = 0;
887 :
888 10 : LWLockRelease(LogicalRepWorkerLock);
889 10 : }
890 :
891 : /*
892 : * Cleanup function.
893 : *
894 : * Called on logical replication worker exit.
895 : */
896 : static void
897 1068 : logicalrep_worker_onexit(int code, Datum arg)
898 : {
899 : /* Disconnect gracefully from the remote side. */
900 1068 : if (LogRepWorkerWalRcvConn)
901 846 : walrcv_disconnect(LogRepWorkerWalRcvConn);
902 :
903 1068 : logicalrep_worker_detach();
904 :
905 : /* Cleanup fileset used for streaming transactions. */
906 1068 : if (MyLogicalRepWorker->stream_fileset != NULL)
907 28 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
908 :
909 : /*
910 : * Session level locks may be acquired outside of a transaction in
911 : * parallel apply mode and will not be released when the worker
912 : * terminates, so manually release all locks before the worker exits.
913 : *
914 : * The locks will be acquired once the worker is initialized.
915 : */
916 1068 : if (!InitializingApplyWorker)
917 948 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
918 :
919 1068 : ApplyLauncherWakeup();
920 1068 : }
921 :
922 : /*
923 : * Count the number of registered (not necessarily running) sync workers
924 : * for a subscription.
925 : */
926 : int
927 2436 : logicalrep_sync_worker_count(Oid subid)
928 : {
929 : int i;
930 2436 : int res = 0;
931 :
932 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
933 :
934 : /* Search for attached worker for a given subscription id. */
935 12576 : for (i = 0; i < max_logical_replication_workers; i++)
936 : {
937 10140 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
938 :
939 10140 : if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
940 2586 : res++;
941 : }
942 :
943 2436 : return res;
944 : }
945 :
946 : /*
947 : * Count the number of registered (but not necessarily running) parallel apply
948 : * workers for a subscription.
949 : */
950 : static int
951 838 : logicalrep_pa_worker_count(Oid subid)
952 : {
953 : int i;
954 838 : int res = 0;
955 :
956 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
957 :
958 : /*
959 : * Scan all attached parallel apply workers, only counting those which
960 : * have the given subscription id.
961 : */
962 4430 : for (i = 0; i < max_logical_replication_workers; i++)
963 : {
964 3592 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
965 :
966 3592 : if (isParallelApplyWorker(w) && w->subid == subid)
967 4 : res++;
968 : }
969 :
970 838 : return res;
971 : }
972 :
973 : /*
974 : * ApplyLauncherShmemSize
975 : * Compute space needed for replication launcher shared memory
976 : */
977 : Size
978 8500 : ApplyLauncherShmemSize(void)
979 : {
980 : Size size;
981 :
982 : /*
983 : * Need the fixed struct and the array of LogicalRepWorker.
984 : */
985 8500 : size = sizeof(LogicalRepCtxStruct);
986 8500 : size = MAXALIGN(size);
987 8500 : size = add_size(size, mul_size(max_logical_replication_workers,
988 : sizeof(LogicalRepWorker)));
989 8500 : return size;
990 : }
991 :
992 : /*
993 : * ApplyLauncherRegister
994 : * Register a background worker running the logical replication launcher.
995 : */
996 : void
997 1766 : ApplyLauncherRegister(void)
998 : {
999 : BackgroundWorker bgw;
1000 :
1001 : /*
1002 : * The logical replication launcher is disabled during binary upgrades, to
1003 : * prevent logical replication workers from running on the source cluster.
1004 : * That could cause replication origins to move forward after having been
1005 : * copied to the target cluster, potentially creating conflicts with the
1006 : * copied data files.
1007 : */
1008 1766 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
1009 106 : return;
1010 :
1011 1660 : memset(&bgw, 0, sizeof(bgw));
1012 1660 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
1013 : BGWORKER_BACKEND_DATABASE_CONNECTION;
1014 1660 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
1015 1660 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
1016 1660 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
1017 1660 : snprintf(bgw.bgw_name, BGW_MAXLEN,
1018 : "logical replication launcher");
1019 1660 : snprintf(bgw.bgw_type, BGW_MAXLEN,
1020 : "logical replication launcher");
1021 1660 : bgw.bgw_restart_time = 5;
1022 1660 : bgw.bgw_notify_pid = 0;
1023 1660 : bgw.bgw_main_arg = (Datum) 0;
1024 :
1025 1660 : RegisterBackgroundWorker(&bgw);
1026 : }
1027 :
1028 : /*
1029 : * ApplyLauncherShmemInit
1030 : * Allocate and initialize replication launcher shared memory
1031 : */
1032 : void
1033 2200 : ApplyLauncherShmemInit(void)
1034 : {
1035 : bool found;
1036 :
1037 2200 : LogicalRepCtx = (LogicalRepCtxStruct *)
1038 2200 : ShmemInitStruct("Logical Replication Launcher Data",
1039 : ApplyLauncherShmemSize(),
1040 : &found);
1041 :
1042 2200 : if (!found)
1043 : {
1044 : int slot;
1045 :
1046 2200 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
1047 :
1048 2200 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
1049 2200 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
1050 :
1051 : /* Initialize memory and spin locks for each worker slot. */
1052 10926 : for (slot = 0; slot < max_logical_replication_workers; slot++)
1053 : {
1054 8726 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
1055 :
1056 8726 : memset(worker, 0, sizeof(LogicalRepWorker));
1057 8726 : SpinLockInit(&worker->relmutex);
1058 : }
1059 : }
1060 2200 : }
1061 :
1062 : /*
1063 : * Initialize or attach to the dynamic shared hash table that stores the
1064 : * last-start times, if not already done.
1065 : * This must be called before accessing the table.
1066 : */
1067 : static void
1068 1502 : logicalrep_launcher_attach_dshmem(void)
1069 : {
1070 : MemoryContext oldcontext;
1071 :
1072 : /* Quick exit if we already did this. */
1073 1502 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
1074 1392 : last_start_times != NULL)
1075 1018 : return;
1076 :
1077 : /* Otherwise, use a lock to ensure only one process creates the table. */
1078 484 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1079 :
1080 : /* Be sure any local memory allocated by DSA routines is persistent. */
1081 484 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1082 :
1083 484 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1084 : {
1085 : /* Initialize dynamic shared hash table for last-start times. */
1086 110 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1087 110 : dsa_pin(last_start_times_dsa);
1088 110 : dsa_pin_mapping(last_start_times_dsa);
1089 110 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1090 :
1091 : /* Store handles in shared memory for other backends to use. */
1092 110 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1093 110 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1094 : }
1095 374 : else if (!last_start_times)
1096 : {
1097 : /* Attach to existing dynamic shared hash table. */
1098 374 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1099 374 : dsa_pin_mapping(last_start_times_dsa);
1100 374 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
1101 374 : LogicalRepCtx->last_start_dsh, NULL);
1102 : }
1103 :
1104 484 : MemoryContextSwitchTo(oldcontext);
1105 484 : LWLockRelease(LogicalRepWorkerLock);
1106 : }
1107 :
1108 : /*
1109 : * Set the last-start time for the subscription.
1110 : */
1111 : static void
1112 408 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1113 : {
1114 : LauncherLastStartTimesEntry *entry;
1115 : bool found;
1116 :
1117 408 : logicalrep_launcher_attach_dshmem();
1118 :
1119 408 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1120 408 : entry->last_start_time = start_time;
1121 408 : dshash_release_lock(last_start_times, entry);
1122 408 : }
1123 :
1124 : /*
1125 : * Return the last-start time for the subscription, or 0 if there isn't one.
1126 : */
1127 : static TimestampTz
1128 642 : ApplyLauncherGetWorkerStartTime(Oid subid)
1129 : {
1130 : LauncherLastStartTimesEntry *entry;
1131 : TimestampTz ret;
1132 :
1133 642 : logicalrep_launcher_attach_dshmem();
1134 :
1135 642 : entry = dshash_find(last_start_times, &subid, false);
1136 642 : if (entry == NULL)
1137 284 : return 0;
1138 :
1139 358 : ret = entry->last_start_time;
1140 358 : dshash_release_lock(last_start_times, entry);
1141 :
1142 358 : return ret;
1143 : }
1144 :
1145 : /*
1146 : * Remove the last-start-time entry for the subscription, if one exists.
1147 : *
1148 : * This has two use-cases: to remove the entry related to a subscription
1149 : * that's been deleted or disabled (just to avoid leaking shared memory),
1150 : * and to allow immediate restart of an apply worker that has exited
1151 : * due to subscription parameter changes.
1152 : */
1153 : void
1154 452 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1155 : {
1156 452 : logicalrep_launcher_attach_dshmem();
1157 :
1158 452 : (void) dshash_delete_key(last_start_times, &subid);
1159 452 : }
1160 :
1161 : /*
1162 : * Wakeup the launcher on commit if requested.
1163 : */
1164 : void
1165 1110776 : AtEOXact_ApplyLauncher(bool isCommit)
1166 : {
1167 1110776 : if (isCommit)
1168 : {
1169 1059618 : if (on_commit_launcher_wakeup)
1170 288 : ApplyLauncherWakeup();
1171 : }
1172 :
1173 1110776 : on_commit_launcher_wakeup = false;
1174 1110776 : }
1175 :
1176 : /*
1177 : * Request wakeup of the launcher on commit of the transaction.
1178 : *
1179 : * This is used to send launcher signal to stop sleeping and process the
1180 : * subscriptions when current transaction commits. Should be used when new
1181 : * tuple was added to the pg_subscription catalog.
1182 : */
1183 : void
1184 290 : ApplyLauncherWakeupAtCommit(void)
1185 : {
1186 290 : if (!on_commit_launcher_wakeup)
1187 288 : on_commit_launcher_wakeup = true;
1188 290 : }
1189 :
1190 : /*
1191 : * Wakeup the launcher immediately.
1192 : */
1193 : void
1194 1428 : ApplyLauncherWakeup(void)
1195 : {
1196 1428 : if (LogicalRepCtx->launcher_pid != 0)
1197 1394 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1198 1428 : }
1199 :
1200 : /*
1201 : * Main loop for the apply launcher process.
1202 : */
1203 : void
1204 860 : ApplyLauncherMain(Datum main_arg)
1205 : {
1206 860 : ereport(DEBUG1,
1207 : (errmsg_internal("logical replication launcher started")));
1208 :
1209 860 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1210 :
1211 : Assert(LogicalRepCtx->launcher_pid == 0);
1212 860 : LogicalRepCtx->launcher_pid = MyProcPid;
1213 :
1214 : /* Establish signal handlers. */
1215 860 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
1216 860 : pqsignal(SIGTERM, die);
1217 860 : BackgroundWorkerUnblockSignals();
1218 :
1219 : /*
1220 : * Establish connection to nailed catalogs (we only ever access
1221 : * pg_subscription).
1222 : */
1223 860 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1224 :
1225 : /*
1226 : * Acquire the conflict detection slot at startup to ensure it can be
1227 : * dropped if no longer needed after a restart.
1228 : */
1229 860 : acquire_conflict_slot_if_exists();
1230 :
1231 : /* Enter main loop */
1232 : for (;;)
1233 4906 : {
1234 : int rc;
1235 : List *sublist;
1236 : ListCell *lc;
1237 : MemoryContext subctx;
1238 : MemoryContext oldctx;
1239 5766 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1240 5766 : bool can_update_xmin = true;
1241 5766 : bool retain_dead_tuples = false;
1242 5766 : TransactionId xmin = InvalidTransactionId;
1243 :
1244 5766 : CHECK_FOR_INTERRUPTS();
1245 :
1246 : /* Use temporary context to avoid leaking memory across cycles. */
1247 5764 : subctx = AllocSetContextCreate(TopMemoryContext,
1248 : "Logical Replication Launcher sublist",
1249 : ALLOCSET_DEFAULT_SIZES);
1250 5764 : oldctx = MemoryContextSwitchTo(subctx);
1251 :
1252 : /*
1253 : * Start any missing workers for enabled subscriptions.
1254 : *
1255 : * Also, during the iteration through all subscriptions, we compute
1256 : * the minimum XID required to protect deleted tuples for conflict
1257 : * detection if one of the subscription enables retain_dead_tuples
1258 : * option.
1259 : */
1260 5764 : sublist = get_subscription_list();
1261 7512 : foreach(lc, sublist)
1262 : {
1263 1748 : Subscription *sub = (Subscription *) lfirst(lc);
1264 : LogicalRepWorker *w;
1265 : TimestampTz last_start;
1266 : TimestampTz now;
1267 : long elapsed;
1268 :
1269 1748 : if (sub->retaindeadtuples)
1270 : {
1271 90 : retain_dead_tuples = true;
1272 :
1273 : /*
1274 : * Create a replication slot to retain information necessary
1275 : * for conflict detection such as dead tuples, commit
1276 : * timestamps, and origins.
1277 : *
1278 : * The slot is created before starting the apply worker to
1279 : * prevent it from unnecessarily maintaining its
1280 : * oldest_nonremovable_xid.
1281 : *
1282 : * The slot is created even for a disabled subscription to
1283 : * ensure that conflict-related information is available when
1284 : * applying remote changes that occurred before the
1285 : * subscription was enabled.
1286 : */
1287 90 : CreateConflictDetectionSlot();
1288 :
1289 90 : if (sub->retentionactive)
1290 : {
1291 : /*
1292 : * Can't advance xmin of the slot unless all the
1293 : * subscriptions actively retaining dead tuples are
1294 : * enabled. This is required to ensure that we don't
1295 : * advance the xmin of CONFLICT_DETECTION_SLOT if one of
1296 : * the subscriptions is not enabled. Otherwise, we won't
1297 : * be able to detect conflicts reliably for such a
1298 : * subscription even though it has set the
1299 : * retain_dead_tuples option.
1300 : */
1301 90 : can_update_xmin &= sub->enabled;
1302 :
1303 : /*
1304 : * Initialize the slot once the subscription activiates
1305 : * retention.
1306 : */
1307 90 : if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
1308 0 : init_conflict_slot_xmin();
1309 : }
1310 : }
1311 :
1312 1748 : if (!sub->enabled)
1313 82 : continue;
1314 :
1315 1666 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1316 1666 : w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid,
1317 : false);
1318 :
1319 1666 : if (w != NULL)
1320 : {
1321 : /*
1322 : * Compute the minimum xmin required to protect dead tuples
1323 : * required for conflict detection among all running apply
1324 : * workers. This computation is performed while holding
1325 : * LogicalRepWorkerLock to prevent accessing invalid worker
1326 : * data, in scenarios where a worker might exit and reset its
1327 : * state concurrently.
1328 : */
1329 1024 : if (sub->retaindeadtuples &&
1330 80 : sub->retentionactive &&
1331 : can_update_xmin)
1332 80 : compute_min_nonremovable_xid(w, &xmin);
1333 :
1334 1024 : LWLockRelease(LogicalRepWorkerLock);
1335 :
1336 : /* worker is running already */
1337 1024 : continue;
1338 : }
1339 :
1340 642 : LWLockRelease(LogicalRepWorkerLock);
1341 :
1342 : /*
1343 : * Can't advance xmin of the slot unless all the workers
1344 : * corresponding to subscriptions actively retaining dead tuples
1345 : * are running, disabling the further computation of the minimum
1346 : * nonremovable xid.
1347 : */
1348 642 : if (sub->retaindeadtuples && sub->retentionactive)
1349 4 : can_update_xmin = false;
1350 :
1351 : /*
1352 : * If the worker is eligible to start now, launch it. Otherwise,
1353 : * adjust wait_time so that we'll wake up as soon as it can be
1354 : * started.
1355 : *
1356 : * Each subscription's apply worker can only be restarted once per
1357 : * wal_retrieve_retry_interval, so that errors do not cause us to
1358 : * repeatedly restart the worker as fast as possible. In cases
1359 : * where a restart is expected (e.g., subscription parameter
1360 : * changes), another process should remove the last-start entry
1361 : * for the subscription so that the worker can be restarted
1362 : * without waiting for wal_retrieve_retry_interval to elapse.
1363 : */
1364 642 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1365 642 : now = GetCurrentTimestamp();
1366 642 : if (last_start == 0 ||
1367 358 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1368 : {
1369 408 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
1370 408 : if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
1371 408 : sub->dbid, sub->oid, sub->name,
1372 : sub->owner, InvalidOid,
1373 : DSM_HANDLE_INVALID,
1374 412 : sub->retaindeadtuples &&
1375 4 : sub->retentionactive))
1376 : {
1377 : /*
1378 : * We get here either if we failed to launch a worker
1379 : * (perhaps for resource-exhaustion reasons) or if we
1380 : * launched one but it immediately quit. Either way, it
1381 : * seems appropriate to try again after
1382 : * wal_retrieve_retry_interval.
1383 : */
1384 24 : wait_time = Min(wait_time,
1385 : wal_retrieve_retry_interval);
1386 : }
1387 : }
1388 : else
1389 : {
1390 234 : wait_time = Min(wait_time,
1391 : wal_retrieve_retry_interval - elapsed);
1392 : }
1393 : }
1394 :
1395 : /*
1396 : * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
1397 : * that requires us to retain dead tuples. Otherwise, if required,
1398 : * advance the slot's xmin to protect dead tuples required for the
1399 : * conflict detection.
1400 : *
1401 : * Additionally, if all apply workers for subscriptions with
1402 : * retain_dead_tuples enabled have requested to stop retention, the
1403 : * slot's xmin will be set to InvalidTransactionId allowing the
1404 : * removal of dead tuples.
1405 : */
1406 5764 : if (MyReplicationSlot)
1407 : {
1408 92 : if (!retain_dead_tuples)
1409 2 : ReplicationSlotDropAcquired();
1410 90 : else if (can_update_xmin)
1411 80 : update_conflict_slot_xmin(xmin);
1412 : }
1413 :
1414 : /* Switch back to original memory context. */
1415 5764 : MemoryContextSwitchTo(oldctx);
1416 : /* Clean the temporary memory. */
1417 5764 : MemoryContextDelete(subctx);
1418 :
1419 : /* Wait for more work. */
1420 5764 : rc = WaitLatch(MyLatch,
1421 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1422 : wait_time,
1423 : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1424 :
1425 5758 : if (rc & WL_LATCH_SET)
1426 : {
1427 5714 : ResetLatch(MyLatch);
1428 5714 : CHECK_FOR_INTERRUPTS();
1429 : }
1430 :
1431 4906 : if (ConfigReloadPending)
1432 : {
1433 72 : ConfigReloadPending = false;
1434 72 : ProcessConfigFile(PGC_SIGHUP);
1435 : }
1436 : }
1437 :
1438 : /* Not reachable */
1439 : }
1440 :
1441 : /*
1442 : * Determine the minimum non-removable transaction ID across all apply workers
1443 : * for subscriptions that have retain_dead_tuples enabled. Store the result
1444 : * in *xmin.
1445 : */
1446 : static void
1447 80 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
1448 : {
1449 : TransactionId nonremovable_xid;
1450 :
1451 : Assert(worker != NULL);
1452 :
1453 : /*
1454 : * The replication slot for conflict detection must be created before the
1455 : * worker starts.
1456 : */
1457 : Assert(MyReplicationSlot);
1458 :
1459 80 : SpinLockAcquire(&worker->relmutex);
1460 80 : nonremovable_xid = worker->oldest_nonremovable_xid;
1461 80 : SpinLockRelease(&worker->relmutex);
1462 :
1463 : /*
1464 : * Return if the apply worker has stopped retention concurrently.
1465 : *
1466 : * Although this function is invoked only when retentionactive is true,
1467 : * the apply worker might stop retention after the launcher fetches the
1468 : * retentionactive flag.
1469 : */
1470 80 : if (!TransactionIdIsValid(nonremovable_xid))
1471 0 : return;
1472 :
1473 80 : if (!TransactionIdIsValid(*xmin) ||
1474 0 : TransactionIdPrecedes(nonremovable_xid, *xmin))
1475 80 : *xmin = nonremovable_xid;
1476 : }
1477 :
1478 : /*
1479 : * Acquire the replication slot used to retain information for conflict
1480 : * detection, if it exists.
1481 : *
1482 : * Return true if successfully acquired, otherwise return false.
1483 : */
1484 : static bool
1485 860 : acquire_conflict_slot_if_exists(void)
1486 : {
1487 860 : if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
1488 858 : return false;
1489 :
1490 2 : ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
1491 2 : return true;
1492 : }
1493 :
1494 : /*
1495 : * Update the xmin the replication slot used to retain information required
1496 : * for conflict detection.
1497 : */
1498 : static void
1499 80 : update_conflict_slot_xmin(TransactionId new_xmin)
1500 : {
1501 : Assert(MyReplicationSlot);
1502 : Assert(!TransactionIdIsValid(new_xmin) ||
1503 : TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
1504 :
1505 : /* Return if the xmin value of the slot cannot be updated */
1506 80 : if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
1507 56 : return;
1508 :
1509 24 : SpinLockAcquire(&MyReplicationSlot->mutex);
1510 24 : MyReplicationSlot->effective_xmin = new_xmin;
1511 24 : MyReplicationSlot->data.xmin = new_xmin;
1512 24 : SpinLockRelease(&MyReplicationSlot->mutex);
1513 :
1514 24 : elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
1515 :
1516 24 : ReplicationSlotMarkDirty();
1517 24 : ReplicationSlotsComputeRequiredXmin(false);
1518 :
1519 : /*
1520 : * Like PhysicalConfirmReceivedLocation(), do not save slot information
1521 : * each time. This is acceptable because all concurrent transactions on
1522 : * the publisher that require the data preceding the slot's xmin should
1523 : * have already been applied and flushed on the subscriber before the xmin
1524 : * is advanced. So, even if the slot's xmin regresses after a restart, it
1525 : * will be advanced again in the next cycle. Therefore, no data required
1526 : * for conflict detection will be prematurely removed.
1527 : */
1528 24 : return;
1529 : }
1530 :
1531 : /*
1532 : * Initialize the xmin for the conflict detection slot.
1533 : */
1534 : static void
1535 8 : init_conflict_slot_xmin(void)
1536 : {
1537 : TransactionId xmin_horizon;
1538 :
1539 : /* Replication slot must exist but shouldn't be initialized. */
1540 : Assert(MyReplicationSlot &&
1541 : !TransactionIdIsValid(MyReplicationSlot->data.xmin));
1542 :
1543 8 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1544 :
1545 8 : xmin_horizon = GetOldestSafeDecodingTransactionId(false);
1546 :
1547 8 : SpinLockAcquire(&MyReplicationSlot->mutex);
1548 8 : MyReplicationSlot->effective_xmin = xmin_horizon;
1549 8 : MyReplicationSlot->data.xmin = xmin_horizon;
1550 8 : SpinLockRelease(&MyReplicationSlot->mutex);
1551 :
1552 8 : ReplicationSlotsComputeRequiredXmin(true);
1553 :
1554 8 : LWLockRelease(ProcArrayLock);
1555 :
1556 : /* Write this slot to disk */
1557 8 : ReplicationSlotMarkDirty();
1558 8 : ReplicationSlotSave();
1559 8 : }
1560 :
1561 : /*
1562 : * Create and acquire the replication slot used to retain information for
1563 : * conflict detection, if not yet.
1564 : */
1565 : void
1566 92 : CreateConflictDetectionSlot(void)
1567 : {
1568 : /* Exit early, if the replication slot is already created and acquired */
1569 92 : if (MyReplicationSlot)
1570 84 : return;
1571 :
1572 8 : ereport(LOG,
1573 : errmsg("creating replication conflict detection slot"));
1574 :
1575 8 : ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
1576 : false, false);
1577 :
1578 8 : init_conflict_slot_xmin();
1579 : }
1580 :
1581 : /*
1582 : * Is current process the logical replication launcher?
1583 : */
1584 : bool
1585 4920 : IsLogicalLauncher(void)
1586 : {
1587 4920 : return LogicalRepCtx->launcher_pid == MyProcPid;
1588 : }
1589 :
1590 : /*
1591 : * Return the pid of the leader apply worker if the given pid is the pid of a
1592 : * parallel apply worker, otherwise, return InvalidPid.
1593 : */
1594 : pid_t
1595 2104 : GetLeaderApplyWorkerPid(pid_t pid)
1596 : {
1597 2104 : int leader_pid = InvalidPid;
1598 : int i;
1599 :
1600 2104 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1601 :
1602 10520 : for (i = 0; i < max_logical_replication_workers; i++)
1603 : {
1604 8416 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1605 :
1606 8416 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1607 : {
1608 0 : leader_pid = w->leader_pid;
1609 0 : break;
1610 : }
1611 : }
1612 :
1613 2104 : LWLockRelease(LogicalRepWorkerLock);
1614 :
1615 2104 : return leader_pid;
1616 : }
1617 :
1618 : /*
1619 : * Returns state of the subscriptions.
1620 : */
1621 : Datum
1622 2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1623 : {
1624 : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1625 2 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1626 : int i;
1627 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1628 :
1629 2 : InitMaterializedSRF(fcinfo, 0);
1630 :
1631 : /* Make sure we get consistent view of the workers. */
1632 2 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1633 :
1634 10 : for (i = 0; i < max_logical_replication_workers; i++)
1635 : {
1636 : /* for each row */
1637 8 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1638 8 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1639 : int worker_pid;
1640 : LogicalRepWorker worker;
1641 :
1642 8 : memcpy(&worker, &LogicalRepCtx->workers[i],
1643 : sizeof(LogicalRepWorker));
1644 8 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1645 4 : continue;
1646 :
1647 4 : if (OidIsValid(subid) && worker.subid != subid)
1648 0 : continue;
1649 :
1650 4 : worker_pid = worker.proc->pid;
1651 :
1652 4 : values[0] = ObjectIdGetDatum(worker.subid);
1653 4 : if (isTableSyncWorker(&worker))
1654 0 : values[1] = ObjectIdGetDatum(worker.relid);
1655 : else
1656 4 : nulls[1] = true;
1657 4 : values[2] = Int32GetDatum(worker_pid);
1658 :
1659 4 : if (isParallelApplyWorker(&worker))
1660 0 : values[3] = Int32GetDatum(worker.leader_pid);
1661 : else
1662 4 : nulls[3] = true;
1663 :
1664 4 : if (!XLogRecPtrIsValid(worker.last_lsn))
1665 2 : nulls[4] = true;
1666 : else
1667 2 : values[4] = LSNGetDatum(worker.last_lsn);
1668 4 : if (worker.last_send_time == 0)
1669 0 : nulls[5] = true;
1670 : else
1671 4 : values[5] = TimestampTzGetDatum(worker.last_send_time);
1672 4 : if (worker.last_recv_time == 0)
1673 0 : nulls[6] = true;
1674 : else
1675 4 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
1676 4 : if (!XLogRecPtrIsValid(worker.reply_lsn))
1677 2 : nulls[7] = true;
1678 : else
1679 2 : values[7] = LSNGetDatum(worker.reply_lsn);
1680 4 : if (worker.reply_time == 0)
1681 0 : nulls[8] = true;
1682 : else
1683 4 : values[8] = TimestampTzGetDatum(worker.reply_time);
1684 :
1685 4 : switch (worker.type)
1686 : {
1687 4 : case WORKERTYPE_APPLY:
1688 4 : values[9] = CStringGetTextDatum("apply");
1689 4 : break;
1690 0 : case WORKERTYPE_PARALLEL_APPLY:
1691 0 : values[9] = CStringGetTextDatum("parallel apply");
1692 0 : break;
1693 0 : case WORKERTYPE_SEQUENCESYNC:
1694 0 : values[9] = CStringGetTextDatum("sequence synchronization");
1695 0 : break;
1696 0 : case WORKERTYPE_TABLESYNC:
1697 0 : values[9] = CStringGetTextDatum("table synchronization");
1698 0 : break;
1699 0 : case WORKERTYPE_UNKNOWN:
1700 : /* Should never happen. */
1701 0 : elog(ERROR, "unknown worker type");
1702 : }
1703 :
1704 4 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1705 : values, nulls);
1706 :
1707 : /*
1708 : * If only a single subscription was requested, and we found it,
1709 : * break.
1710 : */
1711 4 : if (OidIsValid(subid))
1712 0 : break;
1713 : }
1714 :
1715 2 : LWLockRelease(LogicalRepWorkerLock);
1716 :
1717 2 : return (Datum) 0;
1718 : }
|