Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * launcher.c
3 : * PostgreSQL logical replication worker launcher process
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/launcher.c
9 : *
10 : * NOTES
11 : * This module contains the logical replication worker launcher which
12 : * uses the background worker infrastructure to start the logical
13 : * replication workers for every enabled subscription.
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 :
18 : #include "postgres.h"
19 :
20 : #include "access/heapam.h"
21 : #include "access/htup.h"
22 : #include "access/htup_details.h"
23 : #include "access/tableam.h"
24 : #include "access/xact.h"
25 : #include "catalog/pg_subscription.h"
26 : #include "catalog/pg_subscription_rel.h"
27 : #include "funcapi.h"
28 : #include "lib/dshash.h"
29 : #include "miscadmin.h"
30 : #include "pgstat.h"
31 : #include "postmaster/bgworker.h"
32 : #include "postmaster/interrupt.h"
33 : #include "replication/logicallauncher.h"
34 : #include "replication/origin.h"
35 : #include "replication/slot.h"
36 : #include "replication/walreceiver.h"
37 : #include "replication/worker_internal.h"
38 : #include "storage/ipc.h"
39 : #include "storage/proc.h"
40 : #include "storage/procarray.h"
41 : #include "tcop/tcopprot.h"
42 : #include "utils/builtins.h"
43 : #include "utils/memutils.h"
44 : #include "utils/pg_lsn.h"
45 : #include "utils/snapmgr.h"
46 : #include "utils/syscache.h"
47 :
48 : /* max sleep time between cycles (3min) */
49 : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
50 :
51 : /* GUC variables */
52 : int max_logical_replication_workers = 4;
53 : int max_sync_workers_per_subscription = 2;
54 : int max_parallel_apply_workers_per_subscription = 2;
55 :
56 : LogicalRepWorker *MyLogicalRepWorker = NULL;
57 :
58 : typedef struct LogicalRepCtxStruct
59 : {
60 : /* Supervisor process. */
61 : pid_t launcher_pid;
62 :
63 : /* Hash table holding last start times of subscriptions' apply workers. */
64 : dsa_handle last_start_dsa;
65 : dshash_table_handle last_start_dsh;
66 :
67 : /* Background workers. */
68 : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
69 : } LogicalRepCtxStruct;
70 :
71 : static LogicalRepCtxStruct *LogicalRepCtx;
72 :
73 : /* an entry in the last-start-times shared hash table */
74 : typedef struct LauncherLastStartTimesEntry
75 : {
76 : Oid subid; /* OID of logrep subscription (hash key) */
77 : TimestampTz last_start_time; /* last time its apply worker was started */
78 : } LauncherLastStartTimesEntry;
79 :
80 : /* parameters for the last-start-times shared hash table */
81 : static const dshash_parameters dsh_params = {
82 : sizeof(Oid),
83 : sizeof(LauncherLastStartTimesEntry),
84 : dshash_memcmp,
85 : dshash_memhash,
86 : dshash_memcpy,
87 : LWTRANCHE_LAUNCHER_HASH
88 : };
89 :
90 : static dsa_area *last_start_times_dsa = NULL;
91 : static dshash_table *last_start_times = NULL;
92 :
93 : static bool on_commit_launcher_wakeup = false;
94 :
95 :
96 : static void logicalrep_launcher_onexit(int code, Datum arg);
97 : static void logicalrep_worker_onexit(int code, Datum arg);
98 : static void logicalrep_worker_detach(void);
99 : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
100 : static int logicalrep_pa_worker_count(Oid subid);
101 : static void logicalrep_launcher_attach_dshmem(void);
102 : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
103 : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
104 : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
105 : static bool acquire_conflict_slot_if_exists(void);
106 : static void update_conflict_slot_xmin(TransactionId new_xmin);
107 : static void init_conflict_slot_xmin(void);
108 :
109 :
110 : /*
111 : * Load the list of subscriptions.
112 : *
113 : * Only the fields interesting for worker start/stop functions are filled for
114 : * each subscription.
115 : */
116 : static List *
117 5742 : get_subscription_list(void)
118 : {
119 5742 : List *res = NIL;
120 : Relation rel;
121 : TableScanDesc scan;
122 : HeapTuple tup;
123 : MemoryContext resultcxt;
124 :
125 : /* This is the context that we will allocate our output data in */
126 5742 : resultcxt = CurrentMemoryContext;
127 :
128 : /*
129 : * Start a transaction so we can access pg_subscription.
130 : */
131 5742 : StartTransactionCommand();
132 :
133 5742 : rel = table_open(SubscriptionRelationId, AccessShareLock);
134 5742 : scan = table_beginscan_catalog(rel, 0, NULL);
135 :
136 7468 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
137 : {
138 1726 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
139 : Subscription *sub;
140 : MemoryContext oldcxt;
141 :
142 : /*
143 : * Allocate our results in the caller's context, not the
144 : * transaction's. We do this inside the loop, and restore the original
145 : * context at the end, so that leaky things like heap_getnext() are
146 : * not called in a potentially long-lived context.
147 : */
148 1726 : oldcxt = MemoryContextSwitchTo(resultcxt);
149 :
150 1726 : sub = (Subscription *) palloc0(sizeof(Subscription));
151 1726 : sub->oid = subform->oid;
152 1726 : sub->dbid = subform->subdbid;
153 1726 : sub->owner = subform->subowner;
154 1726 : sub->enabled = subform->subenabled;
155 1726 : sub->name = pstrdup(NameStr(subform->subname));
156 1726 : sub->retaindeadtuples = subform->subretaindeadtuples;
157 1726 : sub->retentionactive = subform->subretentionactive;
158 : /* We don't fill fields we are not interested in. */
159 :
160 1726 : res = lappend(res, sub);
161 1726 : MemoryContextSwitchTo(oldcxt);
162 : }
163 :
164 5742 : table_endscan(scan);
165 5742 : table_close(rel, AccessShareLock);
166 :
167 5742 : CommitTransactionCommand();
168 :
169 5742 : return res;
170 : }
171 :
172 : /*
173 : * Wait for a background worker to start up and attach to the shmem context.
174 : *
175 : * This is only needed for cleaning up the shared memory in case the worker
176 : * fails to attach.
177 : *
178 : * Returns whether the attach was successful.
179 : */
180 : static bool
181 810 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
182 : uint16 generation,
183 : BackgroundWorkerHandle *handle)
184 : {
185 810 : bool result = false;
186 810 : bool dropped_latch = false;
187 :
188 : for (;;)
189 2288 : {
190 : BgwHandleStatus status;
191 : pid_t pid;
192 : int rc;
193 :
194 3098 : CHECK_FOR_INTERRUPTS();
195 :
196 3098 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197 :
198 : /* Worker either died or has started. Return false if died. */
199 3098 : if (!worker->in_use || worker->proc)
200 : {
201 808 : result = worker->in_use;
202 808 : LWLockRelease(LogicalRepWorkerLock);
203 808 : break;
204 : }
205 :
206 2290 : LWLockRelease(LogicalRepWorkerLock);
207 :
208 : /* Check if worker has died before attaching, and clean up after it. */
209 2290 : status = GetBackgroundWorkerPid(handle, &pid);
210 :
211 2290 : if (status == BGWH_STOPPED)
212 : {
213 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
214 : /* Ensure that this was indeed the worker we waited for. */
215 0 : if (generation == worker->generation)
216 0 : logicalrep_worker_cleanup(worker);
217 0 : LWLockRelease(LogicalRepWorkerLock);
218 0 : break; /* result is already false */
219 : }
220 :
221 : /*
222 : * We need timeout because we generally don't get notified via latch
223 : * about the worker attach. But we don't expect to have to wait long.
224 : */
225 2290 : rc = WaitLatch(MyLatch,
226 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
227 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
228 :
229 2290 : if (rc & WL_LATCH_SET)
230 : {
231 952 : ResetLatch(MyLatch);
232 952 : CHECK_FOR_INTERRUPTS();
233 950 : dropped_latch = true;
234 : }
235 : }
236 :
237 : /*
238 : * If we had to clear a latch event in order to wait, be sure to restore
239 : * it before exiting. Otherwise caller may miss events.
240 : */
241 808 : if (dropped_latch)
242 808 : SetLatch(MyLatch);
243 :
244 808 : return result;
245 : }
246 :
247 : /*
248 : * Walks the workers array and searches for one that matches given
249 : * subscription id and relid.
250 : *
251 : * We are only interested in the leader apply worker or table sync worker.
252 : */
253 : LogicalRepWorker *
254 6016 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
255 : {
256 : int i;
257 6016 : LogicalRepWorker *res = NULL;
258 :
259 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
260 :
261 : /* Search for attached worker for a given subscription id. */
262 18254 : for (i = 0; i < max_logical_replication_workers; i++)
263 : {
264 15950 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
265 :
266 : /* Skip parallel apply workers. */
267 15950 : if (isParallelApplyWorker(w))
268 0 : continue;
269 :
270 15950 : if (w->in_use && w->subid == subid && w->relid == relid &&
271 3712 : (!only_running || w->proc))
272 : {
273 3712 : res = w;
274 3712 : break;
275 : }
276 : }
277 :
278 6016 : return res;
279 : }
280 :
281 : /*
282 : * Similar to logicalrep_worker_find(), but returns a list of all workers for
283 : * the subscription, instead of just one.
284 : */
285 : List *
286 1282 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
287 : {
288 : int i;
289 1282 : List *res = NIL;
290 :
291 1282 : if (acquire_lock)
292 238 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
293 :
294 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
295 :
296 : /* Search for attached worker for a given subscription id. */
297 6650 : for (i = 0; i < max_logical_replication_workers; i++)
298 : {
299 5368 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
300 :
301 5368 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
302 904 : res = lappend(res, w);
303 : }
304 :
305 1282 : if (acquire_lock)
306 238 : LWLockRelease(LogicalRepWorkerLock);
307 :
308 1282 : return res;
309 : }
310 :
311 : /*
312 : * Start new logical replication background worker, if possible.
313 : *
314 : * Returns true on success, false on failure.
315 : */
316 : bool
317 810 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
318 : Oid dbid, Oid subid, const char *subname, Oid userid,
319 : Oid relid, dsm_handle subworker_dsm,
320 : bool retain_dead_tuples)
321 : {
322 : BackgroundWorker bgw;
323 : BackgroundWorkerHandle *bgw_handle;
324 : uint16 generation;
325 : int i;
326 810 : int slot = 0;
327 810 : LogicalRepWorker *worker = NULL;
328 : int nsyncworkers;
329 : int nparallelapplyworkers;
330 : TimestampTz now;
331 810 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
332 810 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
333 :
334 : /*----------
335 : * Sanity checks:
336 : * - must be valid worker type
337 : * - tablesync workers are only ones to have relid
338 : * - parallel apply worker is the only kind of subworker
339 : * - The replication slot used in conflict detection is created when
340 : * retain_dead_tuples is enabled
341 : */
342 : Assert(wtype != WORKERTYPE_UNKNOWN);
343 : Assert(is_tablesync_worker == OidIsValid(relid));
344 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
345 : Assert(!retain_dead_tuples || MyReplicationSlot);
346 :
347 810 : ereport(DEBUG1,
348 : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
349 : subname)));
350 :
351 : /* Report this after the initial starting message for consistency. */
352 810 : if (max_active_replication_origins == 0)
353 0 : ereport(ERROR,
354 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
355 : errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
356 :
357 : /*
358 : * We need to do the modification of the shared memory under lock so that
359 : * we have consistent view.
360 : */
361 810 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
362 :
363 810 : retry:
364 : /* Find unused worker slot. */
365 1410 : for (i = 0; i < max_logical_replication_workers; i++)
366 : {
367 1410 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
368 :
369 1410 : if (!w->in_use)
370 : {
371 810 : worker = w;
372 810 : slot = i;
373 810 : break;
374 : }
375 : }
376 :
377 810 : nsyncworkers = logicalrep_sync_worker_count(subid);
378 :
379 810 : now = GetCurrentTimestamp();
380 :
381 : /*
382 : * If we didn't find a free slot, try to do garbage collection. The
383 : * reason we do this is because if some worker failed to start up and its
384 : * parent has crashed while waiting, the in_use state was never cleared.
385 : */
386 810 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
387 : {
388 0 : bool did_cleanup = false;
389 :
390 0 : for (i = 0; i < max_logical_replication_workers; i++)
391 : {
392 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
393 :
394 : /*
395 : * If the worker was marked in use but didn't manage to attach in
396 : * time, clean it up.
397 : */
398 0 : if (w->in_use && !w->proc &&
399 0 : TimestampDifferenceExceeds(w->launch_time, now,
400 : wal_receiver_timeout))
401 : {
402 0 : elog(WARNING,
403 : "logical replication worker for subscription %u took too long to start; canceled",
404 : w->subid);
405 :
406 0 : logicalrep_worker_cleanup(w);
407 0 : did_cleanup = true;
408 : }
409 : }
410 :
411 0 : if (did_cleanup)
412 0 : goto retry;
413 : }
414 :
415 : /*
416 : * We don't allow to invoke more sync workers once we have reached the
417 : * sync worker limit per subscription. So, just return silently as we
418 : * might get here because of an otherwise harmless race condition.
419 : */
420 810 : if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
421 : {
422 0 : LWLockRelease(LogicalRepWorkerLock);
423 0 : return false;
424 : }
425 :
426 810 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
427 :
428 : /*
429 : * Return false if the number of parallel apply workers reached the limit
430 : * per subscription.
431 : */
432 810 : if (is_parallel_apply_worker &&
433 20 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
434 : {
435 0 : LWLockRelease(LogicalRepWorkerLock);
436 0 : return false;
437 : }
438 :
439 : /*
440 : * However if there are no more free worker slots, inform user about it
441 : * before exiting.
442 : */
443 810 : if (worker == NULL)
444 : {
445 0 : LWLockRelease(LogicalRepWorkerLock);
446 0 : ereport(WARNING,
447 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
448 : errmsg("out of logical replication worker slots"),
449 : errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
450 0 : return false;
451 : }
452 :
453 : /* Prepare the worker slot. */
454 810 : worker->type = wtype;
455 810 : worker->launch_time = now;
456 810 : worker->in_use = true;
457 810 : worker->generation++;
458 810 : worker->proc = NULL;
459 810 : worker->dbid = dbid;
460 810 : worker->userid = userid;
461 810 : worker->subid = subid;
462 810 : worker->relid = relid;
463 810 : worker->relstate = SUBREL_STATE_UNKNOWN;
464 810 : worker->relstate_lsn = InvalidXLogRecPtr;
465 810 : worker->stream_fileset = NULL;
466 810 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
467 810 : worker->parallel_apply = is_parallel_apply_worker;
468 810 : worker->oldest_nonremovable_xid = retain_dead_tuples
469 4 : ? MyReplicationSlot->data.xmin
470 810 : : InvalidTransactionId;
471 810 : worker->last_lsn = InvalidXLogRecPtr;
472 810 : TIMESTAMP_NOBEGIN(worker->last_send_time);
473 810 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
474 810 : worker->reply_lsn = InvalidXLogRecPtr;
475 810 : TIMESTAMP_NOBEGIN(worker->reply_time);
476 :
477 : /* Before releasing lock, remember generation for future identification. */
478 810 : generation = worker->generation;
479 :
480 810 : LWLockRelease(LogicalRepWorkerLock);
481 :
482 : /* Register the new dynamic worker. */
483 810 : memset(&bgw, 0, sizeof(bgw));
484 810 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
485 : BGWORKER_BACKEND_DATABASE_CONNECTION;
486 810 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
487 810 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
488 :
489 810 : switch (worker->type)
490 : {
491 402 : case WORKERTYPE_APPLY:
492 402 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
493 402 : snprintf(bgw.bgw_name, BGW_MAXLEN,
494 : "logical replication apply worker for subscription %u",
495 : subid);
496 402 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
497 402 : break;
498 :
499 20 : case WORKERTYPE_PARALLEL_APPLY:
500 20 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
501 20 : snprintf(bgw.bgw_name, BGW_MAXLEN,
502 : "logical replication parallel apply worker for subscription %u",
503 : subid);
504 20 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
505 :
506 20 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
507 20 : break;
508 :
509 388 : case WORKERTYPE_TABLESYNC:
510 388 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
511 388 : snprintf(bgw.bgw_name, BGW_MAXLEN,
512 : "logical replication tablesync worker for subscription %u sync %u",
513 : subid,
514 : relid);
515 388 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
516 388 : break;
517 :
518 0 : case WORKERTYPE_UNKNOWN:
519 : /* Should never happen. */
520 0 : elog(ERROR, "unknown worker type");
521 : }
522 :
523 810 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
524 810 : bgw.bgw_notify_pid = MyProcPid;
525 810 : bgw.bgw_main_arg = Int32GetDatum(slot);
526 :
527 810 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
528 : {
529 : /* Failed to start worker, so clean up the worker slot. */
530 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
531 : Assert(generation == worker->generation);
532 0 : logicalrep_worker_cleanup(worker);
533 0 : LWLockRelease(LogicalRepWorkerLock);
534 :
535 0 : ereport(WARNING,
536 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
537 : errmsg("out of background worker slots"),
538 : errhint("You might need to increase \"%s\".", "max_worker_processes")));
539 0 : return false;
540 : }
541 :
542 : /* Now wait until it attaches. */
543 810 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
544 : }
545 :
546 : /*
547 : * Internal function to stop the worker and wait until it detaches from the
548 : * slot.
549 : */
550 : static void
551 164 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
552 : {
553 : uint16 generation;
554 :
555 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
556 :
557 : /*
558 : * Remember which generation was our worker so we can check if what we see
559 : * is still the same one.
560 : */
561 164 : generation = worker->generation;
562 :
563 : /*
564 : * If we found a worker but it does not have proc set then it is still
565 : * starting up; wait for it to finish starting and then kill it.
566 : */
567 168 : while (worker->in_use && !worker->proc)
568 : {
569 : int rc;
570 :
571 10 : LWLockRelease(LogicalRepWorkerLock);
572 :
573 : /* Wait a bit --- we don't expect to have to wait long. */
574 10 : rc = WaitLatch(MyLatch,
575 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
576 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
577 :
578 10 : if (rc & WL_LATCH_SET)
579 : {
580 0 : ResetLatch(MyLatch);
581 0 : CHECK_FOR_INTERRUPTS();
582 : }
583 :
584 : /* Recheck worker status. */
585 10 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
586 :
587 : /*
588 : * Check whether the worker slot is no longer used, which would mean
589 : * that the worker has exited, or whether the worker generation is
590 : * different, meaning that a different worker has taken the slot.
591 : */
592 10 : if (!worker->in_use || worker->generation != generation)
593 0 : return;
594 :
595 : /* Worker has assigned proc, so it has started. */
596 10 : if (worker->proc)
597 6 : break;
598 : }
599 :
600 : /* Now terminate the worker ... */
601 164 : kill(worker->proc->pid, signo);
602 :
603 : /* ... and wait for it to die. */
604 : for (;;)
605 232 : {
606 : int rc;
607 :
608 : /* is it gone? */
609 396 : if (!worker->proc || worker->generation != generation)
610 : break;
611 :
612 232 : LWLockRelease(LogicalRepWorkerLock);
613 :
614 : /* Wait a bit --- we don't expect to have to wait long. */
615 232 : rc = WaitLatch(MyLatch,
616 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
617 : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
618 :
619 232 : if (rc & WL_LATCH_SET)
620 : {
621 96 : ResetLatch(MyLatch);
622 96 : CHECK_FOR_INTERRUPTS();
623 : }
624 :
625 232 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
626 : }
627 : }
628 :
629 : /*
630 : * Stop the logical replication worker for subid/relid, if any.
631 : */
632 : void
633 192 : logicalrep_worker_stop(Oid subid, Oid relid)
634 : {
635 : LogicalRepWorker *worker;
636 :
637 192 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
638 :
639 192 : worker = logicalrep_worker_find(subid, relid, false);
640 :
641 192 : if (worker)
642 : {
643 : Assert(!isParallelApplyWorker(worker));
644 148 : logicalrep_worker_stop_internal(worker, SIGTERM);
645 : }
646 :
647 192 : LWLockRelease(LogicalRepWorkerLock);
648 192 : }
649 :
650 : /*
651 : * Stop the given logical replication parallel apply worker.
652 : *
653 : * Node that the function sends SIGUSR2 instead of SIGTERM to the parallel apply
654 : * worker so that the worker exits cleanly.
655 : */
656 : void
657 10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
658 : {
659 : int slot_no;
660 : uint16 generation;
661 : LogicalRepWorker *worker;
662 :
663 10 : SpinLockAcquire(&winfo->shared->mutex);
664 10 : generation = winfo->shared->logicalrep_worker_generation;
665 10 : slot_no = winfo->shared->logicalrep_worker_slot_no;
666 10 : SpinLockRelease(&winfo->shared->mutex);
667 :
668 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
669 :
670 : /*
671 : * Detach from the error_mq_handle for the parallel apply worker before
672 : * stopping it. This prevents the leader apply worker from trying to
673 : * receive the message from the error queue that might already be detached
674 : * by the parallel apply worker.
675 : */
676 10 : if (winfo->error_mq_handle)
677 : {
678 10 : shm_mq_detach(winfo->error_mq_handle);
679 10 : winfo->error_mq_handle = NULL;
680 : }
681 :
682 10 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
683 :
684 10 : worker = &LogicalRepCtx->workers[slot_no];
685 : Assert(isParallelApplyWorker(worker));
686 :
687 : /*
688 : * Only stop the worker if the generation matches and the worker is alive.
689 : */
690 10 : if (worker->generation == generation && worker->proc)
691 10 : logicalrep_worker_stop_internal(worker, SIGUSR2);
692 :
693 10 : LWLockRelease(LogicalRepWorkerLock);
694 10 : }
695 :
696 : /*
697 : * Wake up (using latch) any logical replication worker for specified sub/rel.
698 : */
699 : void
700 422 : logicalrep_worker_wakeup(Oid subid, Oid relid)
701 : {
702 : LogicalRepWorker *worker;
703 :
704 422 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
705 :
706 422 : worker = logicalrep_worker_find(subid, relid, true);
707 :
708 422 : if (worker)
709 422 : logicalrep_worker_wakeup_ptr(worker);
710 :
711 422 : LWLockRelease(LogicalRepWorkerLock);
712 422 : }
713 :
714 : /*
715 : * Wake up (using latch) the specified logical replication worker.
716 : *
717 : * Caller must hold lock, else worker->proc could change under us.
718 : */
719 : void
720 1286 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
721 : {
722 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
723 :
724 1286 : SetLatch(&worker->proc->procLatch);
725 1286 : }
726 :
727 : /*
728 : * Attach to a slot.
729 : */
730 : void
731 1026 : logicalrep_worker_attach(int slot)
732 : {
733 : /* Block concurrent access. */
734 1026 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
735 :
736 : Assert(slot >= 0 && slot < max_logical_replication_workers);
737 1026 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
738 :
739 1026 : if (!MyLogicalRepWorker->in_use)
740 : {
741 0 : LWLockRelease(LogicalRepWorkerLock);
742 0 : ereport(ERROR,
743 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
744 : errmsg("logical replication worker slot %d is empty, cannot attach",
745 : slot)));
746 : }
747 :
748 1026 : if (MyLogicalRepWorker->proc)
749 : {
750 0 : LWLockRelease(LogicalRepWorkerLock);
751 0 : ereport(ERROR,
752 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
753 : errmsg("logical replication worker slot %d is already used by "
754 : "another worker, cannot attach", slot)));
755 : }
756 :
757 1026 : MyLogicalRepWorker->proc = MyProc;
758 1026 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
759 :
760 1026 : LWLockRelease(LogicalRepWorkerLock);
761 1026 : }
762 :
763 : /*
764 : * Stop the parallel apply workers if any, and detach the leader apply worker
765 : * (cleans up the worker info).
766 : */
767 : static void
768 1026 : logicalrep_worker_detach(void)
769 : {
770 : /* Stop the parallel apply workers. */
771 1026 : if (am_leader_apply_worker())
772 : {
773 : List *workers;
774 : ListCell *lc;
775 :
776 : /*
777 : * Detach from the error_mq_handle for all parallel apply workers
778 : * before terminating them. This prevents the leader apply worker from
779 : * receiving the worker termination message and sending it to logs
780 : * when the same is already done by the parallel worker.
781 : */
782 614 : pa_detach_all_error_mq();
783 :
784 614 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
785 :
786 614 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
787 1236 : foreach(lc, workers)
788 : {
789 622 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
790 :
791 622 : if (isParallelApplyWorker(w))
792 6 : logicalrep_worker_stop_internal(w, SIGTERM);
793 : }
794 :
795 614 : LWLockRelease(LogicalRepWorkerLock);
796 :
797 614 : list_free(workers);
798 : }
799 :
800 : /* Block concurrent access. */
801 1026 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
802 :
803 1026 : logicalrep_worker_cleanup(MyLogicalRepWorker);
804 :
805 1026 : LWLockRelease(LogicalRepWorkerLock);
806 1026 : }
807 :
808 : /*
809 : * Clean up worker info.
810 : */
811 : static void
812 1026 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
813 : {
814 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
815 :
816 1026 : worker->type = WORKERTYPE_UNKNOWN;
817 1026 : worker->in_use = false;
818 1026 : worker->proc = NULL;
819 1026 : worker->dbid = InvalidOid;
820 1026 : worker->userid = InvalidOid;
821 1026 : worker->subid = InvalidOid;
822 1026 : worker->relid = InvalidOid;
823 1026 : worker->leader_pid = InvalidPid;
824 1026 : worker->parallel_apply = false;
825 1026 : }
826 :
827 : /*
828 : * Cleanup function for logical replication launcher.
829 : *
830 : * Called on logical replication launcher exit.
831 : */
832 : static void
833 852 : logicalrep_launcher_onexit(int code, Datum arg)
834 : {
835 852 : LogicalRepCtx->launcher_pid = 0;
836 852 : }
837 :
838 : /*
839 : * Cleanup function.
840 : *
841 : * Called on logical replication worker exit.
842 : */
843 : static void
844 1026 : logicalrep_worker_onexit(int code, Datum arg)
845 : {
846 : /* Disconnect gracefully from the remote side. */
847 1026 : if (LogRepWorkerWalRcvConn)
848 828 : walrcv_disconnect(LogRepWorkerWalRcvConn);
849 :
850 1026 : logicalrep_worker_detach();
851 :
852 : /* Cleanup fileset used for streaming transactions. */
853 1026 : if (MyLogicalRepWorker->stream_fileset != NULL)
854 28 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
855 :
856 : /*
857 : * Session level locks may be acquired outside of a transaction in
858 : * parallel apply mode and will not be released when the worker
859 : * terminates, so manually release all locks before the worker exits.
860 : *
861 : * The locks will be acquired once the worker is initialized.
862 : */
863 1026 : if (!InitializingApplyWorker)
864 916 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
865 :
866 1026 : ApplyLauncherWakeup();
867 1026 : }
868 :
869 : /*
870 : * Count the number of registered (not necessarily running) sync workers
871 : * for a subscription.
872 : */
873 : int
874 2426 : logicalrep_sync_worker_count(Oid subid)
875 : {
876 : int i;
877 2426 : int res = 0;
878 :
879 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
880 :
881 : /* Search for attached worker for a given subscription id. */
882 12510 : for (i = 0; i < max_logical_replication_workers; i++)
883 : {
884 10084 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
885 :
886 10084 : if (isTablesyncWorker(w) && w->subid == subid)
887 2692 : res++;
888 : }
889 :
890 2426 : return res;
891 : }
892 :
893 : /*
894 : * Count the number of registered (but not necessarily running) parallel apply
895 : * workers for a subscription.
896 : */
897 : static int
898 810 : logicalrep_pa_worker_count(Oid subid)
899 : {
900 : int i;
901 810 : int res = 0;
902 :
903 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
904 :
905 : /*
906 : * Scan all attached parallel apply workers, only counting those which
907 : * have the given subscription id.
908 : */
909 4282 : for (i = 0; i < max_logical_replication_workers; i++)
910 : {
911 3472 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
912 :
913 3472 : if (isParallelApplyWorker(w) && w->subid == subid)
914 4 : res++;
915 : }
916 :
917 810 : return res;
918 : }
919 :
920 : /*
921 : * ApplyLauncherShmemSize
922 : * Compute space needed for replication launcher shared memory
923 : */
924 : Size
925 8420 : ApplyLauncherShmemSize(void)
926 : {
927 : Size size;
928 :
929 : /*
930 : * Need the fixed struct and the array of LogicalRepWorker.
931 : */
932 8420 : size = sizeof(LogicalRepCtxStruct);
933 8420 : size = MAXALIGN(size);
934 8420 : size = add_size(size, mul_size(max_logical_replication_workers,
935 : sizeof(LogicalRepWorker)));
936 8420 : return size;
937 : }
938 :
939 : /*
940 : * ApplyLauncherRegister
941 : * Register a background worker running the logical replication launcher.
942 : */
943 : void
944 1746 : ApplyLauncherRegister(void)
945 : {
946 : BackgroundWorker bgw;
947 :
948 : /*
949 : * The logical replication launcher is disabled during binary upgrades, to
950 : * prevent logical replication workers from running on the source cluster.
951 : * That could cause replication origins to move forward after having been
952 : * copied to the target cluster, potentially creating conflicts with the
953 : * copied data files.
954 : */
955 1746 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
956 106 : return;
957 :
958 1640 : memset(&bgw, 0, sizeof(bgw));
959 1640 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
960 : BGWORKER_BACKEND_DATABASE_CONNECTION;
961 1640 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
962 1640 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
963 1640 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
964 1640 : snprintf(bgw.bgw_name, BGW_MAXLEN,
965 : "logical replication launcher");
966 1640 : snprintf(bgw.bgw_type, BGW_MAXLEN,
967 : "logical replication launcher");
968 1640 : bgw.bgw_restart_time = 5;
969 1640 : bgw.bgw_notify_pid = 0;
970 1640 : bgw.bgw_main_arg = (Datum) 0;
971 :
972 1640 : RegisterBackgroundWorker(&bgw);
973 : }
974 :
975 : /*
976 : * ApplyLauncherShmemInit
977 : * Allocate and initialize replication launcher shared memory
978 : */
979 : void
980 2180 : ApplyLauncherShmemInit(void)
981 : {
982 : bool found;
983 :
984 2180 : LogicalRepCtx = (LogicalRepCtxStruct *)
985 2180 : ShmemInitStruct("Logical Replication Launcher Data",
986 : ApplyLauncherShmemSize(),
987 : &found);
988 :
989 2180 : if (!found)
990 : {
991 : int slot;
992 :
993 2180 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
994 :
995 2180 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
996 2180 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
997 :
998 : /* Initialize memory and spin locks for each worker slot. */
999 10826 : for (slot = 0; slot < max_logical_replication_workers; slot++)
1000 : {
1001 8646 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
1002 :
1003 8646 : memset(worker, 0, sizeof(LogicalRepWorker));
1004 8646 : SpinLockInit(&worker->relmutex);
1005 : }
1006 : }
1007 2180 : }
1008 :
1009 : /*
1010 : * Initialize or attach to the dynamic shared hash table that stores the
1011 : * last-start times, if not already done.
1012 : * This must be called before accessing the table.
1013 : */
1014 : static void
1015 1488 : logicalrep_launcher_attach_dshmem(void)
1016 : {
1017 : MemoryContext oldcontext;
1018 :
1019 : /* Quick exit if we already did this. */
1020 1488 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
1021 1378 : last_start_times != NULL)
1022 1014 : return;
1023 :
1024 : /* Otherwise, use a lock to ensure only one process creates the table. */
1025 474 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1026 :
1027 : /* Be sure any local memory allocated by DSA routines is persistent. */
1028 474 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1029 :
1030 474 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1031 : {
1032 : /* Initialize dynamic shared hash table for last-start times. */
1033 110 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1034 110 : dsa_pin(last_start_times_dsa);
1035 110 : dsa_pin_mapping(last_start_times_dsa);
1036 110 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1037 :
1038 : /* Store handles in shared memory for other backends to use. */
1039 110 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1040 110 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1041 : }
1042 364 : else if (!last_start_times)
1043 : {
1044 : /* Attach to existing dynamic shared hash table. */
1045 364 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1046 364 : dsa_pin_mapping(last_start_times_dsa);
1047 364 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
1048 364 : LogicalRepCtx->last_start_dsh, NULL);
1049 : }
1050 :
1051 474 : MemoryContextSwitchTo(oldcontext);
1052 474 : LWLockRelease(LogicalRepWorkerLock);
1053 : }
1054 :
1055 : /*
1056 : * Set the last-start time for the subscription.
1057 : */
1058 : static void
1059 402 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1060 : {
1061 : LauncherLastStartTimesEntry *entry;
1062 : bool found;
1063 :
1064 402 : logicalrep_launcher_attach_dshmem();
1065 :
1066 402 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1067 402 : entry->last_start_time = start_time;
1068 402 : dshash_release_lock(last_start_times, entry);
1069 402 : }
1070 :
1071 : /*
1072 : * Return the last-start time for the subscription, or 0 if there isn't one.
1073 : */
1074 : static TimestampTz
1075 644 : ApplyLauncherGetWorkerStartTime(Oid subid)
1076 : {
1077 : LauncherLastStartTimesEntry *entry;
1078 : TimestampTz ret;
1079 :
1080 644 : logicalrep_launcher_attach_dshmem();
1081 :
1082 644 : entry = dshash_find(last_start_times, &subid, false);
1083 644 : if (entry == NULL)
1084 284 : return 0;
1085 :
1086 360 : ret = entry->last_start_time;
1087 360 : dshash_release_lock(last_start_times, entry);
1088 :
1089 360 : return ret;
1090 : }
1091 :
1092 : /*
1093 : * Remove the last-start-time entry for the subscription, if one exists.
1094 : *
1095 : * This has two use-cases: to remove the entry related to a subscription
1096 : * that's been deleted or disabled (just to avoid leaking shared memory),
1097 : * and to allow immediate restart of an apply worker that has exited
1098 : * due to subscription parameter changes.
1099 : */
1100 : void
1101 442 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1102 : {
1103 442 : logicalrep_launcher_attach_dshmem();
1104 :
1105 442 : (void) dshash_delete_key(last_start_times, &subid);
1106 442 : }
1107 :
1108 : /*
1109 : * Wakeup the launcher on commit if requested.
1110 : */
1111 : void
1112 1100002 : AtEOXact_ApplyLauncher(bool isCommit)
1113 : {
1114 1100002 : if (isCommit)
1115 : {
1116 1049268 : if (on_commit_launcher_wakeup)
1117 286 : ApplyLauncherWakeup();
1118 : }
1119 :
1120 1100002 : on_commit_launcher_wakeup = false;
1121 1100002 : }
1122 :
1123 : /*
1124 : * Request wakeup of the launcher on commit of the transaction.
1125 : *
1126 : * This is used to send launcher signal to stop sleeping and process the
1127 : * subscriptions when current transaction commits. Should be used when new
1128 : * tuple was added to the pg_subscription catalog.
1129 : */
1130 : void
1131 288 : ApplyLauncherWakeupAtCommit(void)
1132 : {
1133 288 : if (!on_commit_launcher_wakeup)
1134 286 : on_commit_launcher_wakeup = true;
1135 288 : }
1136 :
1137 : /*
1138 : * Wakeup the launcher immediately.
1139 : */
1140 : void
1141 1392 : ApplyLauncherWakeup(void)
1142 : {
1143 1392 : if (LogicalRepCtx->launcher_pid != 0)
1144 1362 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1145 1392 : }
1146 :
1147 : /*
1148 : * Main loop for the apply launcher process.
1149 : */
1150 : void
1151 852 : ApplyLauncherMain(Datum main_arg)
1152 : {
1153 852 : ereport(DEBUG1,
1154 : (errmsg_internal("logical replication launcher started")));
1155 :
1156 852 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1157 :
1158 : Assert(LogicalRepCtx->launcher_pid == 0);
1159 852 : LogicalRepCtx->launcher_pid = MyProcPid;
1160 :
1161 : /* Establish signal handlers. */
1162 852 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
1163 852 : pqsignal(SIGTERM, die);
1164 852 : BackgroundWorkerUnblockSignals();
1165 :
1166 : /*
1167 : * Establish connection to nailed catalogs (we only ever access
1168 : * pg_subscription).
1169 : */
1170 852 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1171 :
1172 : /*
1173 : * Acquire the conflict detection slot at startup to ensure it can be
1174 : * dropped if no longer needed after a restart.
1175 : */
1176 852 : acquire_conflict_slot_if_exists();
1177 :
1178 : /* Enter main loop */
1179 : for (;;)
1180 4896 : {
1181 : int rc;
1182 : List *sublist;
1183 : ListCell *lc;
1184 : MemoryContext subctx;
1185 : MemoryContext oldctx;
1186 5748 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1187 5748 : bool can_update_xmin = true;
1188 5748 : bool retain_dead_tuples = false;
1189 5748 : TransactionId xmin = InvalidTransactionId;
1190 :
1191 5748 : CHECK_FOR_INTERRUPTS();
1192 :
1193 : /* Use temporary context to avoid leaking memory across cycles. */
1194 5742 : subctx = AllocSetContextCreate(TopMemoryContext,
1195 : "Logical Replication Launcher sublist",
1196 : ALLOCSET_DEFAULT_SIZES);
1197 5742 : oldctx = MemoryContextSwitchTo(subctx);
1198 :
1199 : /*
1200 : * Start any missing workers for enabled subscriptions.
1201 : *
1202 : * Also, during the iteration through all subscriptions, we compute
1203 : * the minimum XID required to protect deleted tuples for conflict
1204 : * detection if one of the subscription enables retain_dead_tuples
1205 : * option.
1206 : */
1207 5742 : sublist = get_subscription_list();
1208 7468 : foreach(lc, sublist)
1209 : {
1210 1726 : Subscription *sub = (Subscription *) lfirst(lc);
1211 : LogicalRepWorker *w;
1212 : TimestampTz last_start;
1213 : TimestampTz now;
1214 : long elapsed;
1215 :
1216 1726 : if (sub->retaindeadtuples)
1217 : {
1218 108 : retain_dead_tuples = true;
1219 :
1220 : /*
1221 : * Create a replication slot to retain information necessary
1222 : * for conflict detection such as dead tuples, commit
1223 : * timestamps, and origins.
1224 : *
1225 : * The slot is created before starting the apply worker to
1226 : * prevent it from unnecessarily maintaining its
1227 : * oldest_nonremovable_xid.
1228 : *
1229 : * The slot is created even for a disabled subscription to
1230 : * ensure that conflict-related information is available when
1231 : * applying remote changes that occurred before the
1232 : * subscription was enabled.
1233 : */
1234 108 : CreateConflictDetectionSlot();
1235 :
1236 108 : if (sub->retentionactive)
1237 : {
1238 : /*
1239 : * Can't advance xmin of the slot unless all the
1240 : * subscriptions actively retaining dead tuples are
1241 : * enabled. This is required to ensure that we don't
1242 : * advance the xmin of CONFLICT_DETECTION_SLOT if one of
1243 : * the subscriptions is not enabled. Otherwise, we won't
1244 : * be able to detect conflicts reliably for such a
1245 : * subscription even though it has set the
1246 : * retain_dead_tuples option.
1247 : */
1248 108 : can_update_xmin &= sub->enabled;
1249 :
1250 : /*
1251 : * Initialize the slot once the subscription activiates
1252 : * retention.
1253 : */
1254 108 : if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
1255 0 : init_conflict_slot_xmin();
1256 : }
1257 : }
1258 :
1259 1726 : if (!sub->enabled)
1260 74 : continue;
1261 :
1262 1652 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1263 1652 : w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1264 :
1265 1652 : if (w != NULL)
1266 : {
1267 : /*
1268 : * Compute the minimum xmin required to protect dead tuples
1269 : * required for conflict detection among all running apply
1270 : * workers. This computation is performed while holding
1271 : * LogicalRepWorkerLock to prevent accessing invalid worker
1272 : * data, in scenarios where a worker might exit and reset its
1273 : * state concurrently.
1274 : */
1275 1008 : if (sub->retaindeadtuples &&
1276 100 : sub->retentionactive &&
1277 : can_update_xmin)
1278 100 : compute_min_nonremovable_xid(w, &xmin);
1279 :
1280 1008 : LWLockRelease(LogicalRepWorkerLock);
1281 :
1282 : /* worker is running already */
1283 1008 : continue;
1284 : }
1285 :
1286 644 : LWLockRelease(LogicalRepWorkerLock);
1287 :
1288 : /*
1289 : * Can't advance xmin of the slot unless all the workers
1290 : * corresponding to subscriptions actively retaining dead tuples
1291 : * are running, disabling the further computation of the minimum
1292 : * nonremovable xid.
1293 : */
1294 644 : if (sub->retaindeadtuples && sub->retentionactive)
1295 4 : can_update_xmin = false;
1296 :
1297 : /*
1298 : * If the worker is eligible to start now, launch it. Otherwise,
1299 : * adjust wait_time so that we'll wake up as soon as it can be
1300 : * started.
1301 : *
1302 : * Each subscription's apply worker can only be restarted once per
1303 : * wal_retrieve_retry_interval, so that errors do not cause us to
1304 : * repeatedly restart the worker as fast as possible. In cases
1305 : * where a restart is expected (e.g., subscription parameter
1306 : * changes), another process should remove the last-start entry
1307 : * for the subscription so that the worker can be restarted
1308 : * without waiting for wal_retrieve_retry_interval to elapse.
1309 : */
1310 644 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1311 644 : now = GetCurrentTimestamp();
1312 644 : if (last_start == 0 ||
1313 360 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1314 : {
1315 402 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
1316 402 : if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
1317 402 : sub->dbid, sub->oid, sub->name,
1318 : sub->owner, InvalidOid,
1319 : DSM_HANDLE_INVALID,
1320 406 : sub->retaindeadtuples &&
1321 4 : sub->retentionactive))
1322 : {
1323 : /*
1324 : * We get here either if we failed to launch a worker
1325 : * (perhaps for resource-exhaustion reasons) or if we
1326 : * launched one but it immediately quit. Either way, it
1327 : * seems appropriate to try again after
1328 : * wal_retrieve_retry_interval.
1329 : */
1330 12 : wait_time = Min(wait_time,
1331 : wal_retrieve_retry_interval);
1332 : }
1333 : }
1334 : else
1335 : {
1336 242 : wait_time = Min(wait_time,
1337 : wal_retrieve_retry_interval - elapsed);
1338 : }
1339 : }
1340 :
1341 : /*
1342 : * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
1343 : * that requires us to retain dead tuples. Otherwise, if required,
1344 : * advance the slot's xmin to protect dead tuples required for the
1345 : * conflict detection.
1346 : *
1347 : * Additionally, if all apply workers for subscriptions with
1348 : * retain_dead_tuples enabled have requested to stop retention, the
1349 : * slot's xmin will be set to InvalidTransactionId allowing the
1350 : * removal of dead tuples.
1351 : */
1352 5742 : if (MyReplicationSlot)
1353 : {
1354 110 : if (!retain_dead_tuples)
1355 2 : ReplicationSlotDropAcquired();
1356 108 : else if (can_update_xmin)
1357 100 : update_conflict_slot_xmin(xmin);
1358 : }
1359 :
1360 : /* Switch back to original memory context. */
1361 5742 : MemoryContextSwitchTo(oldctx);
1362 : /* Clean the temporary memory. */
1363 5742 : MemoryContextDelete(subctx);
1364 :
1365 : /* Wait for more work. */
1366 5742 : rc = WaitLatch(MyLatch,
1367 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1368 : wait_time,
1369 : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1370 :
1371 5736 : if (rc & WL_LATCH_SET)
1372 : {
1373 5684 : ResetLatch(MyLatch);
1374 5684 : CHECK_FOR_INTERRUPTS();
1375 : }
1376 :
1377 4896 : if (ConfigReloadPending)
1378 : {
1379 72 : ConfigReloadPending = false;
1380 72 : ProcessConfigFile(PGC_SIGHUP);
1381 : }
1382 : }
1383 :
1384 : /* Not reachable */
1385 : }
1386 :
1387 : /*
1388 : * Determine the minimum non-removable transaction ID across all apply workers
1389 : * for subscriptions that have retain_dead_tuples enabled. Store the result
1390 : * in *xmin.
1391 : */
1392 : static void
1393 100 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
1394 : {
1395 : TransactionId nonremovable_xid;
1396 :
1397 : Assert(worker != NULL);
1398 :
1399 : /*
1400 : * The replication slot for conflict detection must be created before the
1401 : * worker starts.
1402 : */
1403 : Assert(MyReplicationSlot);
1404 :
1405 100 : SpinLockAcquire(&worker->relmutex);
1406 100 : nonremovable_xid = worker->oldest_nonremovable_xid;
1407 100 : SpinLockRelease(&worker->relmutex);
1408 :
1409 : /*
1410 : * Return if the apply worker has stopped retention concurrently.
1411 : *
1412 : * Although this function is invoked only when retentionactive is true,
1413 : * the apply worker might stop retention after the launcher fetches the
1414 : * retentionactive flag.
1415 : */
1416 100 : if (!TransactionIdIsValid(nonremovable_xid))
1417 0 : return;
1418 :
1419 100 : if (!TransactionIdIsValid(*xmin) ||
1420 0 : TransactionIdPrecedes(nonremovable_xid, *xmin))
1421 100 : *xmin = nonremovable_xid;
1422 : }
1423 :
1424 : /*
1425 : * Acquire the replication slot used to retain information for conflict
1426 : * detection, if it exists.
1427 : *
1428 : * Return true if successfully acquired, otherwise return false.
1429 : */
1430 : static bool
1431 852 : acquire_conflict_slot_if_exists(void)
1432 : {
1433 852 : if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
1434 850 : return false;
1435 :
1436 2 : ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
1437 2 : return true;
1438 : }
1439 :
1440 : /*
1441 : * Update the xmin the replication slot used to retain information required
1442 : * for conflict detection.
1443 : */
1444 : static void
1445 100 : update_conflict_slot_xmin(TransactionId new_xmin)
1446 : {
1447 : Assert(MyReplicationSlot);
1448 : Assert(!TransactionIdIsValid(new_xmin) ||
1449 : TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
1450 :
1451 : /* Return if the xmin value of the slot cannot be updated */
1452 100 : if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
1453 70 : return;
1454 :
1455 30 : SpinLockAcquire(&MyReplicationSlot->mutex);
1456 30 : MyReplicationSlot->effective_xmin = new_xmin;
1457 30 : MyReplicationSlot->data.xmin = new_xmin;
1458 30 : SpinLockRelease(&MyReplicationSlot->mutex);
1459 :
1460 30 : elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
1461 :
1462 30 : ReplicationSlotMarkDirty();
1463 30 : ReplicationSlotsComputeRequiredXmin(false);
1464 :
1465 : /*
1466 : * Like PhysicalConfirmReceivedLocation(), do not save slot information
1467 : * each time. This is acceptable because all concurrent transactions on
1468 : * the publisher that require the data preceding the slot's xmin should
1469 : * have already been applied and flushed on the subscriber before the xmin
1470 : * is advanced. So, even if the slot's xmin regresses after a restart, it
1471 : * will be advanced again in the next cycle. Therefore, no data required
1472 : * for conflict detection will be prematurely removed.
1473 : */
1474 30 : return;
1475 : }
1476 :
1477 : /*
1478 : * Initialize the xmin for the conflict detection slot.
1479 : */
1480 : static void
1481 8 : init_conflict_slot_xmin(void)
1482 : {
1483 : TransactionId xmin_horizon;
1484 :
1485 : /* Replication slot must exist but shouldn't be initialized. */
1486 : Assert(MyReplicationSlot &&
1487 : !TransactionIdIsValid(MyReplicationSlot->data.xmin));
1488 :
1489 8 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1490 :
1491 8 : xmin_horizon = GetOldestSafeDecodingTransactionId(false);
1492 :
1493 8 : SpinLockAcquire(&MyReplicationSlot->mutex);
1494 8 : MyReplicationSlot->effective_xmin = xmin_horizon;
1495 8 : MyReplicationSlot->data.xmin = xmin_horizon;
1496 8 : SpinLockRelease(&MyReplicationSlot->mutex);
1497 :
1498 8 : ReplicationSlotsComputeRequiredXmin(true);
1499 :
1500 8 : LWLockRelease(ProcArrayLock);
1501 :
1502 : /* Write this slot to disk */
1503 8 : ReplicationSlotMarkDirty();
1504 8 : ReplicationSlotSave();
1505 8 : }
1506 :
1507 : /*
1508 : * Create and acquire the replication slot used to retain information for
1509 : * conflict detection, if not yet.
1510 : */
1511 : void
1512 110 : CreateConflictDetectionSlot(void)
1513 : {
1514 : /* Exit early, if the replication slot is already created and acquired */
1515 110 : if (MyReplicationSlot)
1516 102 : return;
1517 :
1518 8 : ereport(LOG,
1519 : errmsg("creating replication conflict detection slot"));
1520 :
1521 8 : ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
1522 : false, false);
1523 :
1524 8 : init_conflict_slot_xmin();
1525 : }
1526 :
1527 : /*
1528 : * Is current process the logical replication launcher?
1529 : */
1530 : bool
1531 4896 : IsLogicalLauncher(void)
1532 : {
1533 4896 : return LogicalRepCtx->launcher_pid == MyProcPid;
1534 : }
1535 :
1536 : /*
1537 : * Return the pid of the leader apply worker if the given pid is the pid of a
1538 : * parallel apply worker, otherwise, return InvalidPid.
1539 : */
1540 : pid_t
1541 1896 : GetLeaderApplyWorkerPid(pid_t pid)
1542 : {
1543 1896 : int leader_pid = InvalidPid;
1544 : int i;
1545 :
1546 1896 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1547 :
1548 9480 : for (i = 0; i < max_logical_replication_workers; i++)
1549 : {
1550 7584 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1551 :
1552 7584 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1553 : {
1554 0 : leader_pid = w->leader_pid;
1555 0 : break;
1556 : }
1557 : }
1558 :
1559 1896 : LWLockRelease(LogicalRepWorkerLock);
1560 :
1561 1896 : return leader_pid;
1562 : }
1563 :
1564 : /*
1565 : * Returns state of the subscriptions.
1566 : */
1567 : Datum
1568 2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1569 : {
1570 : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1571 2 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1572 : int i;
1573 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1574 :
1575 2 : InitMaterializedSRF(fcinfo, 0);
1576 :
1577 : /* Make sure we get consistent view of the workers. */
1578 2 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1579 :
1580 10 : for (i = 0; i < max_logical_replication_workers; i++)
1581 : {
1582 : /* for each row */
1583 8 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1584 8 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1585 : int worker_pid;
1586 : LogicalRepWorker worker;
1587 :
1588 8 : memcpy(&worker, &LogicalRepCtx->workers[i],
1589 : sizeof(LogicalRepWorker));
1590 8 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1591 4 : continue;
1592 :
1593 4 : if (OidIsValid(subid) && worker.subid != subid)
1594 0 : continue;
1595 :
1596 4 : worker_pid = worker.proc->pid;
1597 :
1598 4 : values[0] = ObjectIdGetDatum(worker.subid);
1599 4 : if (isTablesyncWorker(&worker))
1600 0 : values[1] = ObjectIdGetDatum(worker.relid);
1601 : else
1602 4 : nulls[1] = true;
1603 4 : values[2] = Int32GetDatum(worker_pid);
1604 :
1605 4 : if (isParallelApplyWorker(&worker))
1606 0 : values[3] = Int32GetDatum(worker.leader_pid);
1607 : else
1608 4 : nulls[3] = true;
1609 :
1610 4 : if (XLogRecPtrIsInvalid(worker.last_lsn))
1611 2 : nulls[4] = true;
1612 : else
1613 2 : values[4] = LSNGetDatum(worker.last_lsn);
1614 4 : if (worker.last_send_time == 0)
1615 0 : nulls[5] = true;
1616 : else
1617 4 : values[5] = TimestampTzGetDatum(worker.last_send_time);
1618 4 : if (worker.last_recv_time == 0)
1619 0 : nulls[6] = true;
1620 : else
1621 4 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
1622 4 : if (XLogRecPtrIsInvalid(worker.reply_lsn))
1623 2 : nulls[7] = true;
1624 : else
1625 2 : values[7] = LSNGetDatum(worker.reply_lsn);
1626 4 : if (worker.reply_time == 0)
1627 0 : nulls[8] = true;
1628 : else
1629 4 : values[8] = TimestampTzGetDatum(worker.reply_time);
1630 :
1631 4 : switch (worker.type)
1632 : {
1633 4 : case WORKERTYPE_APPLY:
1634 4 : values[9] = CStringGetTextDatum("apply");
1635 4 : break;
1636 0 : case WORKERTYPE_PARALLEL_APPLY:
1637 0 : values[9] = CStringGetTextDatum("parallel apply");
1638 0 : break;
1639 0 : case WORKERTYPE_TABLESYNC:
1640 0 : values[9] = CStringGetTextDatum("table synchronization");
1641 0 : break;
1642 0 : case WORKERTYPE_UNKNOWN:
1643 : /* Should never happen. */
1644 0 : elog(ERROR, "unknown worker type");
1645 : }
1646 :
1647 4 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1648 : values, nulls);
1649 :
1650 : /*
1651 : * If only a single subscription was requested, and we found it,
1652 : * break.
1653 : */
1654 4 : if (OidIsValid(subid))
1655 0 : break;
1656 : }
1657 :
1658 2 : LWLockRelease(LogicalRepWorkerLock);
1659 :
1660 2 : return (Datum) 0;
1661 : }
|