Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * launcher.c
3 : * PostgreSQL logical replication worker launcher process
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/launcher.c
9 : *
10 : * NOTES
11 : * This module contains the logical replication worker launcher which
12 : * uses the background worker infrastructure to start the logical
13 : * replication workers for every enabled subscription.
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 :
18 : #include "postgres.h"
19 :
20 : #include "access/heapam.h"
21 : #include "access/htup.h"
22 : #include "access/htup_details.h"
23 : #include "access/tableam.h"
24 : #include "access/xact.h"
25 : #include "catalog/pg_subscription.h"
26 : #include "catalog/pg_subscription_rel.h"
27 : #include "funcapi.h"
28 : #include "lib/dshash.h"
29 : #include "miscadmin.h"
30 : #include "pgstat.h"
31 : #include "postmaster/bgworker.h"
32 : #include "postmaster/interrupt.h"
33 : #include "replication/logicallauncher.h"
34 : #include "replication/origin.h"
35 : #include "replication/slot.h"
36 : #include "replication/walreceiver.h"
37 : #include "replication/worker_internal.h"
38 : #include "storage/ipc.h"
39 : #include "storage/proc.h"
40 : #include "storage/procarray.h"
41 : #include "tcop/tcopprot.h"
42 : #include "utils/builtins.h"
43 : #include "utils/memutils.h"
44 : #include "utils/pg_lsn.h"
45 : #include "utils/snapmgr.h"
46 :
47 : /* max sleep time between cycles (3min) */
48 : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
49 :
50 : /* GUC variables */
51 : int max_logical_replication_workers = 4;
52 : int max_sync_workers_per_subscription = 2;
53 : int max_parallel_apply_workers_per_subscription = 2;
54 :
55 : LogicalRepWorker *MyLogicalRepWorker = NULL;
56 :
57 : typedef struct LogicalRepCtxStruct
58 : {
59 : /* Supervisor process. */
60 : pid_t launcher_pid;
61 :
62 : /* Hash table holding last start times of subscriptions' apply workers. */
63 : dsa_handle last_start_dsa;
64 : dshash_table_handle last_start_dsh;
65 :
66 : /* Background workers. */
67 : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
68 : } LogicalRepCtxStruct;
69 :
70 : static LogicalRepCtxStruct *LogicalRepCtx;
71 :
72 : /* an entry in the last-start-times shared hash table */
73 : typedef struct LauncherLastStartTimesEntry
74 : {
75 : Oid subid; /* OID of logrep subscription (hash key) */
76 : TimestampTz last_start_time; /* last time its apply worker was started */
77 : } LauncherLastStartTimesEntry;
78 :
79 : /* parameters for the last-start-times shared hash table */
80 : static const dshash_parameters dsh_params = {
81 : sizeof(Oid),
82 : sizeof(LauncherLastStartTimesEntry),
83 : dshash_memcmp,
84 : dshash_memhash,
85 : dshash_memcpy,
86 : LWTRANCHE_LAUNCHER_HASH
87 : };
88 :
89 : static dsa_area *last_start_times_dsa = NULL;
90 : static dshash_table *last_start_times = NULL;
91 :
92 : static bool on_commit_launcher_wakeup = false;
93 :
94 :
95 : static void logicalrep_launcher_onexit(int code, Datum arg);
96 : static void logicalrep_worker_onexit(int code, Datum arg);
97 : static void logicalrep_worker_detach(void);
98 : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
99 : static int logicalrep_pa_worker_count(Oid subid);
100 : static void logicalrep_launcher_attach_dshmem(void);
101 : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
102 : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
103 : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
104 : static bool acquire_conflict_slot_if_exists(void);
105 : static void advance_conflict_slot_xmin(TransactionId new_xmin);
106 :
107 :
108 : /*
109 : * Load the list of subscriptions.
110 : *
111 : * Only the fields interesting for worker start/stop functions are filled for
112 : * each subscription.
113 : */
114 : static List *
115 5318 : get_subscription_list(void)
116 : {
117 5318 : List *res = NIL;
118 : Relation rel;
119 : TableScanDesc scan;
120 : HeapTuple tup;
121 : MemoryContext resultcxt;
122 :
123 : /* This is the context that we will allocate our output data in */
124 5318 : resultcxt = CurrentMemoryContext;
125 :
126 : /*
127 : * Start a transaction so we can access pg_subscription.
128 : */
129 5318 : StartTransactionCommand();
130 :
131 5318 : rel = table_open(SubscriptionRelationId, AccessShareLock);
132 5318 : scan = table_beginscan_catalog(rel, 0, NULL);
133 :
134 6702 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
135 : {
136 1384 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
137 : Subscription *sub;
138 : MemoryContext oldcxt;
139 :
140 : /*
141 : * Allocate our results in the caller's context, not the
142 : * transaction's. We do this inside the loop, and restore the original
143 : * context at the end, so that leaky things like heap_getnext() are
144 : * not called in a potentially long-lived context.
145 : */
146 1384 : oldcxt = MemoryContextSwitchTo(resultcxt);
147 :
148 1384 : sub = (Subscription *) palloc0(sizeof(Subscription));
149 1384 : sub->oid = subform->oid;
150 1384 : sub->dbid = subform->subdbid;
151 1384 : sub->owner = subform->subowner;
152 1384 : sub->enabled = subform->subenabled;
153 1384 : sub->name = pstrdup(NameStr(subform->subname));
154 1384 : sub->retaindeadtuples = subform->subretaindeadtuples;
155 : /* We don't fill fields we are not interested in. */
156 :
157 1384 : res = lappend(res, sub);
158 1384 : MemoryContextSwitchTo(oldcxt);
159 : }
160 :
161 5318 : table_endscan(scan);
162 5318 : table_close(rel, AccessShareLock);
163 :
164 5318 : CommitTransactionCommand();
165 :
166 5318 : return res;
167 : }
168 :
169 : /*
170 : * Wait for a background worker to start up and attach to the shmem context.
171 : *
172 : * This is only needed for cleaning up the shared memory in case the worker
173 : * fails to attach.
174 : *
175 : * Returns whether the attach was successful.
176 : */
177 : static bool
178 718 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
179 : uint16 generation,
180 : BackgroundWorkerHandle *handle)
181 : {
182 718 : bool result = false;
183 718 : bool dropped_latch = false;
184 :
185 : for (;;)
186 3228 : {
187 : BgwHandleStatus status;
188 : pid_t pid;
189 : int rc;
190 :
191 3946 : CHECK_FOR_INTERRUPTS();
192 :
193 3946 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
194 :
195 : /* Worker either died or has started. Return false if died. */
196 3946 : if (!worker->in_use || worker->proc)
197 : {
198 718 : result = worker->in_use;
199 718 : LWLockRelease(LogicalRepWorkerLock);
200 718 : break;
201 : }
202 :
203 3228 : LWLockRelease(LogicalRepWorkerLock);
204 :
205 : /* Check if worker has died before attaching, and clean up after it. */
206 3228 : status = GetBackgroundWorkerPid(handle, &pid);
207 :
208 3228 : if (status == BGWH_STOPPED)
209 : {
210 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
211 : /* Ensure that this was indeed the worker we waited for. */
212 0 : if (generation == worker->generation)
213 0 : logicalrep_worker_cleanup(worker);
214 0 : LWLockRelease(LogicalRepWorkerLock);
215 0 : break; /* result is already false */
216 : }
217 :
218 : /*
219 : * We need timeout because we generally don't get notified via latch
220 : * about the worker attach. But we don't expect to have to wait long.
221 : */
222 3228 : rc = WaitLatch(MyLatch,
223 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
224 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
225 :
226 3228 : if (rc & WL_LATCH_SET)
227 : {
228 852 : ResetLatch(MyLatch);
229 852 : CHECK_FOR_INTERRUPTS();
230 852 : dropped_latch = true;
231 : }
232 : }
233 :
234 : /*
235 : * If we had to clear a latch event in order to wait, be sure to restore
236 : * it before exiting. Otherwise caller may miss events.
237 : */
238 718 : if (dropped_latch)
239 718 : SetLatch(MyLatch);
240 :
241 718 : return result;
242 : }
243 :
244 : /*
245 : * Walks the workers array and searches for one that matches given
246 : * subscription id and relid.
247 : *
248 : * We are only interested in the leader apply worker or table sync worker.
249 : */
250 : LogicalRepWorker *
251 5780 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
252 : {
253 : int i;
254 5780 : LogicalRepWorker *res = NULL;
255 :
256 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
257 :
258 : /* Search for attached worker for a given subscription id. */
259 17542 : for (i = 0; i < max_logical_replication_workers; i++)
260 : {
261 15342 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
262 :
263 : /* Skip parallel apply workers. */
264 15342 : if (isParallelApplyWorker(w))
265 0 : continue;
266 :
267 15342 : if (w->in_use && w->subid == subid && w->relid == relid &&
268 3580 : (!only_running || w->proc))
269 : {
270 3580 : res = w;
271 3580 : break;
272 : }
273 : }
274 :
275 5780 : return res;
276 : }
277 :
278 : /*
279 : * Similar to logicalrep_worker_find(), but returns a list of all workers for
280 : * the subscription, instead of just one.
281 : */
282 : List *
283 1138 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
284 : {
285 : int i;
286 1138 : List *res = NIL;
287 :
288 1138 : if (acquire_lock)
289 228 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
290 :
291 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
292 :
293 : /* Search for attached worker for a given subscription id. */
294 5874 : for (i = 0; i < max_logical_replication_workers; i++)
295 : {
296 4736 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
297 :
298 4736 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
299 796 : res = lappend(res, w);
300 : }
301 :
302 1138 : if (acquire_lock)
303 228 : LWLockRelease(LogicalRepWorkerLock);
304 :
305 1138 : return res;
306 : }
307 :
308 : /*
309 : * Start new logical replication background worker, if possible.
310 : *
311 : * Returns true on success, false on failure.
312 : */
313 : bool
314 732 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
315 : Oid dbid, Oid subid, const char *subname, Oid userid,
316 : Oid relid, dsm_handle subworker_dsm,
317 : bool retain_dead_tuples)
318 : {
319 : BackgroundWorker bgw;
320 : BackgroundWorkerHandle *bgw_handle;
321 : uint16 generation;
322 : int i;
323 732 : int slot = 0;
324 732 : LogicalRepWorker *worker = NULL;
325 : int nsyncworkers;
326 : int nparallelapplyworkers;
327 : TimestampTz now;
328 732 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
329 732 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
330 :
331 : /*----------
332 : * Sanity checks:
333 : * - must be valid worker type
334 : * - tablesync workers are only ones to have relid
335 : * - parallel apply worker is the only kind of subworker
336 : * - The replication slot used in conflict detection is created when
337 : * retain_dead_tuples is enabled
338 : */
339 : Assert(wtype != WORKERTYPE_UNKNOWN);
340 : Assert(is_tablesync_worker == OidIsValid(relid));
341 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
342 : Assert(!retain_dead_tuples || MyReplicationSlot);
343 :
344 732 : ereport(DEBUG1,
345 : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
346 : subname)));
347 :
348 : /* Report this after the initial starting message for consistency. */
349 732 : if (max_active_replication_origins == 0)
350 0 : ereport(ERROR,
351 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
352 : errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
353 :
354 : /*
355 : * We need to do the modification of the shared memory under lock so that
356 : * we have consistent view.
357 : */
358 732 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
359 :
360 732 : retry:
361 : /* Find unused worker slot. */
362 1332 : for (i = 0; i < max_logical_replication_workers; i++)
363 : {
364 1332 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
365 :
366 1332 : if (!w->in_use)
367 : {
368 732 : worker = w;
369 732 : slot = i;
370 732 : break;
371 : }
372 : }
373 :
374 732 : nsyncworkers = logicalrep_sync_worker_count(subid);
375 :
376 732 : now = GetCurrentTimestamp();
377 :
378 : /*
379 : * If we didn't find a free slot, try to do garbage collection. The
380 : * reason we do this is because if some worker failed to start up and its
381 : * parent has crashed while waiting, the in_use state was never cleared.
382 : */
383 732 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
384 : {
385 0 : bool did_cleanup = false;
386 :
387 0 : for (i = 0; i < max_logical_replication_workers; i++)
388 : {
389 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
390 :
391 : /*
392 : * If the worker was marked in use but didn't manage to attach in
393 : * time, clean it up.
394 : */
395 0 : if (w->in_use && !w->proc &&
396 0 : TimestampDifferenceExceeds(w->launch_time, now,
397 : wal_receiver_timeout))
398 : {
399 0 : elog(WARNING,
400 : "logical replication worker for subscription %u took too long to start; canceled",
401 : w->subid);
402 :
403 0 : logicalrep_worker_cleanup(w);
404 0 : did_cleanup = true;
405 : }
406 : }
407 :
408 0 : if (did_cleanup)
409 0 : goto retry;
410 : }
411 :
412 : /*
413 : * We don't allow to invoke more sync workers once we have reached the
414 : * sync worker limit per subscription. So, just return silently as we
415 : * might get here because of an otherwise harmless race condition.
416 : */
417 732 : if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
418 : {
419 0 : LWLockRelease(LogicalRepWorkerLock);
420 0 : return false;
421 : }
422 :
423 732 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
424 :
425 : /*
426 : * Return false if the number of parallel apply workers reached the limit
427 : * per subscription.
428 : */
429 732 : if (is_parallel_apply_worker &&
430 20 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
431 : {
432 0 : LWLockRelease(LogicalRepWorkerLock);
433 0 : return false;
434 : }
435 :
436 : /*
437 : * However if there are no more free worker slots, inform user about it
438 : * before exiting.
439 : */
440 732 : if (worker == NULL)
441 : {
442 0 : LWLockRelease(LogicalRepWorkerLock);
443 0 : ereport(WARNING,
444 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
445 : errmsg("out of logical replication worker slots"),
446 : errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
447 0 : return false;
448 : }
449 :
450 : /* Prepare the worker slot. */
451 732 : worker->type = wtype;
452 732 : worker->launch_time = now;
453 732 : worker->in_use = true;
454 732 : worker->generation++;
455 732 : worker->proc = NULL;
456 732 : worker->dbid = dbid;
457 732 : worker->userid = userid;
458 732 : worker->subid = subid;
459 732 : worker->relid = relid;
460 732 : worker->relstate = SUBREL_STATE_UNKNOWN;
461 732 : worker->relstate_lsn = InvalidXLogRecPtr;
462 732 : worker->stream_fileset = NULL;
463 732 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
464 732 : worker->parallel_apply = is_parallel_apply_worker;
465 732 : worker->oldest_nonremovable_xid = retain_dead_tuples
466 2 : ? MyReplicationSlot->data.xmin
467 732 : : InvalidTransactionId;
468 732 : worker->last_lsn = InvalidXLogRecPtr;
469 732 : TIMESTAMP_NOBEGIN(worker->last_send_time);
470 732 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
471 732 : worker->reply_lsn = InvalidXLogRecPtr;
472 732 : TIMESTAMP_NOBEGIN(worker->reply_time);
473 :
474 : /* Before releasing lock, remember generation for future identification. */
475 732 : generation = worker->generation;
476 :
477 732 : LWLockRelease(LogicalRepWorkerLock);
478 :
479 : /* Register the new dynamic worker. */
480 732 : memset(&bgw, 0, sizeof(bgw));
481 732 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
482 : BGWORKER_BACKEND_DATABASE_CONNECTION;
483 732 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
484 732 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
485 :
486 732 : switch (worker->type)
487 : {
488 310 : case WORKERTYPE_APPLY:
489 310 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
490 310 : snprintf(bgw.bgw_name, BGW_MAXLEN,
491 : "logical replication apply worker for subscription %u",
492 : subid);
493 310 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
494 310 : break;
495 :
496 20 : case WORKERTYPE_PARALLEL_APPLY:
497 20 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
498 20 : snprintf(bgw.bgw_name, BGW_MAXLEN,
499 : "logical replication parallel apply worker for subscription %u",
500 : subid);
501 20 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
502 :
503 20 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
504 20 : break;
505 :
506 402 : case WORKERTYPE_TABLESYNC:
507 402 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
508 402 : snprintf(bgw.bgw_name, BGW_MAXLEN,
509 : "logical replication tablesync worker for subscription %u sync %u",
510 : subid,
511 : relid);
512 402 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
513 402 : break;
514 :
515 0 : case WORKERTYPE_UNKNOWN:
516 : /* Should never happen. */
517 0 : elog(ERROR, "unknown worker type");
518 : }
519 :
520 732 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
521 732 : bgw.bgw_notify_pid = MyProcPid;
522 732 : bgw.bgw_main_arg = Int32GetDatum(slot);
523 :
524 732 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
525 : {
526 : /* Failed to start worker, so clean up the worker slot. */
527 14 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
528 : Assert(generation == worker->generation);
529 14 : logicalrep_worker_cleanup(worker);
530 14 : LWLockRelease(LogicalRepWorkerLock);
531 :
532 14 : ereport(WARNING,
533 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
534 : errmsg("out of background worker slots"),
535 : errhint("You might need to increase \"%s\".", "max_worker_processes")));
536 14 : return false;
537 : }
538 :
539 : /* Now wait until it attaches. */
540 718 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
541 : }
542 :
543 : /*
544 : * Internal function to stop the worker and wait until it detaches from the
545 : * slot.
546 : */
547 : static void
548 164 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
549 : {
550 : uint16 generation;
551 :
552 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
553 :
554 : /*
555 : * Remember which generation was our worker so we can check if what we see
556 : * is still the same one.
557 : */
558 164 : generation = worker->generation;
559 :
560 : /*
561 : * If we found a worker but it does not have proc set then it is still
562 : * starting up; wait for it to finish starting and then kill it.
563 : */
564 168 : while (worker->in_use && !worker->proc)
565 : {
566 : int rc;
567 :
568 10 : LWLockRelease(LogicalRepWorkerLock);
569 :
570 : /* Wait a bit --- we don't expect to have to wait long. */
571 10 : rc = WaitLatch(MyLatch,
572 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
573 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
574 :
575 10 : if (rc & WL_LATCH_SET)
576 : {
577 2 : ResetLatch(MyLatch);
578 2 : CHECK_FOR_INTERRUPTS();
579 : }
580 :
581 : /* Recheck worker status. */
582 10 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
583 :
584 : /*
585 : * Check whether the worker slot is no longer used, which would mean
586 : * that the worker has exited, or whether the worker generation is
587 : * different, meaning that a different worker has taken the slot.
588 : */
589 10 : if (!worker->in_use || worker->generation != generation)
590 0 : return;
591 :
592 : /* Worker has assigned proc, so it has started. */
593 10 : if (worker->proc)
594 6 : break;
595 : }
596 :
597 : /* Now terminate the worker ... */
598 164 : kill(worker->proc->pid, signo);
599 :
600 : /* ... and wait for it to die. */
601 : for (;;)
602 214 : {
603 : int rc;
604 :
605 : /* is it gone? */
606 378 : if (!worker->proc || worker->generation != generation)
607 : break;
608 :
609 214 : LWLockRelease(LogicalRepWorkerLock);
610 :
611 : /* Wait a bit --- we don't expect to have to wait long. */
612 214 : rc = WaitLatch(MyLatch,
613 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
614 : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
615 :
616 214 : if (rc & WL_LATCH_SET)
617 : {
618 98 : ResetLatch(MyLatch);
619 98 : CHECK_FOR_INTERRUPTS();
620 : }
621 :
622 214 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
623 : }
624 : }
625 :
626 : /*
627 : * Stop the logical replication worker for subid/relid, if any.
628 : */
629 : void
630 188 : logicalrep_worker_stop(Oid subid, Oid relid)
631 : {
632 : LogicalRepWorker *worker;
633 :
634 188 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
635 :
636 188 : worker = logicalrep_worker_find(subid, relid, false);
637 :
638 188 : if (worker)
639 : {
640 : Assert(!isParallelApplyWorker(worker));
641 148 : logicalrep_worker_stop_internal(worker, SIGTERM);
642 : }
643 :
644 188 : LWLockRelease(LogicalRepWorkerLock);
645 188 : }
646 :
647 : /*
648 : * Stop the given logical replication parallel apply worker.
649 : *
650 : * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
651 : * worker so that the worker exits cleanly.
652 : */
653 : void
654 10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
655 : {
656 : int slot_no;
657 : uint16 generation;
658 : LogicalRepWorker *worker;
659 :
660 10 : SpinLockAcquire(&winfo->shared->mutex);
661 10 : generation = winfo->shared->logicalrep_worker_generation;
662 10 : slot_no = winfo->shared->logicalrep_worker_slot_no;
663 10 : SpinLockRelease(&winfo->shared->mutex);
664 :
665 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
666 :
667 : /*
668 : * Detach from the error_mq_handle for the parallel apply worker before
669 : * stopping it. This prevents the leader apply worker from trying to
670 : * receive the message from the error queue that might already be detached
671 : * by the parallel apply worker.
672 : */
673 10 : if (winfo->error_mq_handle)
674 : {
675 10 : shm_mq_detach(winfo->error_mq_handle);
676 10 : winfo->error_mq_handle = NULL;
677 : }
678 :
679 10 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
680 :
681 10 : worker = &LogicalRepCtx->workers[slot_no];
682 : Assert(isParallelApplyWorker(worker));
683 :
684 : /*
685 : * Only stop the worker if the generation matches and the worker is alive.
686 : */
687 10 : if (worker->generation == generation && worker->proc)
688 10 : logicalrep_worker_stop_internal(worker, SIGINT);
689 :
690 10 : LWLockRelease(LogicalRepWorkerLock);
691 10 : }
692 :
693 : /*
694 : * Wake up (using latch) any logical replication worker for specified sub/rel.
695 : */
696 : void
697 420 : logicalrep_worker_wakeup(Oid subid, Oid relid)
698 : {
699 : LogicalRepWorker *worker;
700 :
701 420 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
702 :
703 420 : worker = logicalrep_worker_find(subid, relid, true);
704 :
705 420 : if (worker)
706 420 : logicalrep_worker_wakeup_ptr(worker);
707 :
708 420 : LWLockRelease(LogicalRepWorkerLock);
709 420 : }
710 :
711 : /*
712 : * Wake up (using latch) the specified logical replication worker.
713 : *
714 : * Caller must hold lock, else worker->proc could change under us.
715 : */
716 : void
717 1270 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
718 : {
719 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
720 :
721 1270 : SetLatch(&worker->proc->procLatch);
722 1270 : }
723 :
724 : /*
725 : * Attach to a slot.
726 : */
727 : void
728 926 : logicalrep_worker_attach(int slot)
729 : {
730 : /* Block concurrent access. */
731 926 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
732 :
733 : Assert(slot >= 0 && slot < max_logical_replication_workers);
734 926 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
735 :
736 926 : if (!MyLogicalRepWorker->in_use)
737 : {
738 0 : LWLockRelease(LogicalRepWorkerLock);
739 0 : ereport(ERROR,
740 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
741 : errmsg("logical replication worker slot %d is empty, cannot attach",
742 : slot)));
743 : }
744 :
745 926 : if (MyLogicalRepWorker->proc)
746 : {
747 0 : LWLockRelease(LogicalRepWorkerLock);
748 0 : ereport(ERROR,
749 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
750 : errmsg("logical replication worker slot %d is already used by "
751 : "another worker, cannot attach", slot)));
752 : }
753 :
754 926 : MyLogicalRepWorker->proc = MyProc;
755 926 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
756 :
757 926 : LWLockRelease(LogicalRepWorkerLock);
758 926 : }
759 :
760 : /*
761 : * Stop the parallel apply workers if any, and detach the leader apply worker
762 : * (cleans up the worker info).
763 : */
764 : static void
765 926 : logicalrep_worker_detach(void)
766 : {
767 : /* Stop the parallel apply workers. */
768 926 : if (am_leader_apply_worker())
769 : {
770 : List *workers;
771 : ListCell *lc;
772 :
773 : /*
774 : * Detach from the error_mq_handle for all parallel apply workers
775 : * before terminating them. This prevents the leader apply worker from
776 : * receiving the worker termination message and sending it to logs
777 : * when the same is already done by the parallel worker.
778 : */
779 514 : pa_detach_all_error_mq();
780 :
781 514 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
782 :
783 514 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
784 1040 : foreach(lc, workers)
785 : {
786 526 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
787 :
788 526 : if (isParallelApplyWorker(w))
789 6 : logicalrep_worker_stop_internal(w, SIGTERM);
790 : }
791 :
792 514 : LWLockRelease(LogicalRepWorkerLock);
793 : }
794 :
795 : /* Block concurrent access. */
796 926 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
797 :
798 926 : logicalrep_worker_cleanup(MyLogicalRepWorker);
799 :
800 926 : LWLockRelease(LogicalRepWorkerLock);
801 926 : }
802 :
803 : /*
804 : * Clean up worker info.
805 : */
806 : static void
807 940 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
808 : {
809 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
810 :
811 940 : worker->type = WORKERTYPE_UNKNOWN;
812 940 : worker->in_use = false;
813 940 : worker->proc = NULL;
814 940 : worker->dbid = InvalidOid;
815 940 : worker->userid = InvalidOid;
816 940 : worker->subid = InvalidOid;
817 940 : worker->relid = InvalidOid;
818 940 : worker->leader_pid = InvalidPid;
819 940 : worker->parallel_apply = false;
820 940 : }
821 :
822 : /*
823 : * Cleanup function for logical replication launcher.
824 : *
825 : * Called on logical replication launcher exit.
826 : */
827 : static void
828 818 : logicalrep_launcher_onexit(int code, Datum arg)
829 : {
830 818 : LogicalRepCtx->launcher_pid = 0;
831 818 : }
832 :
833 : /*
834 : * Cleanup function.
835 : *
836 : * Called on logical replication worker exit.
837 : */
838 : static void
839 926 : logicalrep_worker_onexit(int code, Datum arg)
840 : {
841 : /* Disconnect gracefully from the remote side. */
842 926 : if (LogRepWorkerWalRcvConn)
843 816 : walrcv_disconnect(LogRepWorkerWalRcvConn);
844 :
845 926 : logicalrep_worker_detach();
846 :
847 : /* Cleanup fileset used for streaming transactions. */
848 926 : if (MyLogicalRepWorker->stream_fileset != NULL)
849 28 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
850 :
851 : /*
852 : * Session level locks may be acquired outside of a transaction in
853 : * parallel apply mode and will not be released when the worker
854 : * terminates, so manually release all locks before the worker exits.
855 : *
856 : * The locks will be acquired once the worker is initialized.
857 : */
858 926 : if (!InitializingApplyWorker)
859 920 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
860 :
861 926 : ApplyLauncherWakeup();
862 926 : }
863 :
864 : /*
865 : * Count the number of registered (not necessarily running) sync workers
866 : * for a subscription.
867 : */
868 : int
869 2432 : logicalrep_sync_worker_count(Oid subid)
870 : {
871 : int i;
872 2432 : int res = 0;
873 :
874 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
875 :
876 : /* Search for attached worker for a given subscription id. */
877 12504 : for (i = 0; i < max_logical_replication_workers; i++)
878 : {
879 10072 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
880 :
881 10072 : if (isTablesyncWorker(w) && w->subid == subid)
882 2700 : res++;
883 : }
884 :
885 2432 : return res;
886 : }
887 :
888 : /*
889 : * Count the number of registered (but not necessarily running) parallel apply
890 : * workers for a subscription.
891 : */
892 : static int
893 732 : logicalrep_pa_worker_count(Oid subid)
894 : {
895 : int i;
896 732 : int res = 0;
897 :
898 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
899 :
900 : /*
901 : * Scan all attached parallel apply workers, only counting those which
902 : * have the given subscription id.
903 : */
904 3848 : for (i = 0; i < max_logical_replication_workers; i++)
905 : {
906 3116 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
907 :
908 3116 : if (isParallelApplyWorker(w) && w->subid == subid)
909 4 : res++;
910 : }
911 :
912 732 : return res;
913 : }
914 :
915 : /*
916 : * ApplyLauncherShmemSize
917 : * Compute space needed for replication launcher shared memory
918 : */
919 : Size
920 8302 : ApplyLauncherShmemSize(void)
921 : {
922 : Size size;
923 :
924 : /*
925 : * Need the fixed struct and the array of LogicalRepWorker.
926 : */
927 8302 : size = sizeof(LogicalRepCtxStruct);
928 8302 : size = MAXALIGN(size);
929 8302 : size = add_size(size, mul_size(max_logical_replication_workers,
930 : sizeof(LogicalRepWorker)));
931 8302 : return size;
932 : }
933 :
934 : /*
935 : * ApplyLauncherRegister
936 : * Register a background worker running the logical replication launcher.
937 : */
938 : void
939 1730 : ApplyLauncherRegister(void)
940 : {
941 : BackgroundWorker bgw;
942 :
943 : /*
944 : * The logical replication launcher is disabled during binary upgrades, to
945 : * prevent logical replication workers from running on the source cluster.
946 : * That could cause replication origins to move forward after having been
947 : * copied to the target cluster, potentially creating conflicts with the
948 : * copied data files.
949 : */
950 1730 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
951 106 : return;
952 :
953 1624 : memset(&bgw, 0, sizeof(bgw));
954 1624 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
955 : BGWORKER_BACKEND_DATABASE_CONNECTION;
956 1624 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
957 1624 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
958 1624 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
959 1624 : snprintf(bgw.bgw_name, BGW_MAXLEN,
960 : "logical replication launcher");
961 1624 : snprintf(bgw.bgw_type, BGW_MAXLEN,
962 : "logical replication launcher");
963 1624 : bgw.bgw_restart_time = 5;
964 1624 : bgw.bgw_notify_pid = 0;
965 1624 : bgw.bgw_main_arg = (Datum) 0;
966 :
967 1624 : RegisterBackgroundWorker(&bgw);
968 : }
969 :
970 : /*
971 : * ApplyLauncherShmemInit
972 : * Allocate and initialize replication launcher shared memory
973 : */
974 : void
975 2152 : ApplyLauncherShmemInit(void)
976 : {
977 : bool found;
978 :
979 2152 : LogicalRepCtx = (LogicalRepCtxStruct *)
980 2152 : ShmemInitStruct("Logical Replication Launcher Data",
981 : ApplyLauncherShmemSize(),
982 : &found);
983 :
984 2152 : if (!found)
985 : {
986 : int slot;
987 :
988 2152 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
989 :
990 2152 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
991 2152 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
992 :
993 : /* Initialize memory and spin locks for each worker slot. */
994 10686 : for (slot = 0; slot < max_logical_replication_workers; slot++)
995 : {
996 8534 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
997 :
998 8534 : memset(worker, 0, sizeof(LogicalRepWorker));
999 8534 : SpinLockInit(&worker->relmutex);
1000 : }
1001 : }
1002 2152 : }
1003 :
1004 : /*
1005 : * Initialize or attach to the dynamic shared hash table that stores the
1006 : * last-start times, if not already done.
1007 : * This must be called before accessing the table.
1008 : */
1009 : static void
1010 1090 : logicalrep_launcher_attach_dshmem(void)
1011 : {
1012 : MemoryContext oldcontext;
1013 :
1014 : /* Quick exit if we already did this. */
1015 1090 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
1016 984 : last_start_times != NULL)
1017 734 : return;
1018 :
1019 : /* Otherwise, use a lock to ensure only one process creates the table. */
1020 356 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1021 :
1022 : /* Be sure any local memory allocated by DSA routines is persistent. */
1023 356 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1024 :
1025 356 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1026 : {
1027 : /* Initialize dynamic shared hash table for last-start times. */
1028 106 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1029 106 : dsa_pin(last_start_times_dsa);
1030 106 : dsa_pin_mapping(last_start_times_dsa);
1031 106 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1032 :
1033 : /* Store handles in shared memory for other backends to use. */
1034 106 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1035 106 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1036 : }
1037 250 : else if (!last_start_times)
1038 : {
1039 : /* Attach to existing dynamic shared hash table. */
1040 250 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1041 250 : dsa_pin_mapping(last_start_times_dsa);
1042 250 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
1043 250 : LogicalRepCtx->last_start_dsh, NULL);
1044 : }
1045 :
1046 356 : MemoryContextSwitchTo(oldcontext);
1047 356 : LWLockRelease(LogicalRepWorkerLock);
1048 : }
1049 :
1050 : /*
1051 : * Set the last-start time for the subscription.
1052 : */
1053 : static void
1054 310 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1055 : {
1056 : LauncherLastStartTimesEntry *entry;
1057 : bool found;
1058 :
1059 310 : logicalrep_launcher_attach_dshmem();
1060 :
1061 310 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1062 310 : entry->last_start_time = start_time;
1063 310 : dshash_release_lock(last_start_times, entry);
1064 310 : }
1065 :
1066 : /*
1067 : * Return the last-start time for the subscription, or 0 if there isn't one.
1068 : */
1069 : static TimestampTz
1070 460 : ApplyLauncherGetWorkerStartTime(Oid subid)
1071 : {
1072 : LauncherLastStartTimesEntry *entry;
1073 : TimestampTz ret;
1074 :
1075 460 : logicalrep_launcher_attach_dshmem();
1076 :
1077 460 : entry = dshash_find(last_start_times, &subid, false);
1078 460 : if (entry == NULL)
1079 248 : return 0;
1080 :
1081 212 : ret = entry->last_start_time;
1082 212 : dshash_release_lock(last_start_times, entry);
1083 :
1084 212 : return ret;
1085 : }
1086 :
1087 : /*
1088 : * Remove the last-start-time entry for the subscription, if one exists.
1089 : *
1090 : * This has two use-cases: to remove the entry related to a subscription
1091 : * that's been deleted or disabled (just to avoid leaking shared memory),
1092 : * and to allow immediate restart of an apply worker that has exited
1093 : * due to subscription parameter changes.
1094 : */
1095 : void
1096 320 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1097 : {
1098 320 : logicalrep_launcher_attach_dshmem();
1099 :
1100 320 : (void) dshash_delete_key(last_start_times, &subid);
1101 320 : }
1102 :
1103 : /*
1104 : * Wakeup the launcher on commit if requested.
1105 : */
1106 : void
1107 1096240 : AtEOXact_ApplyLauncher(bool isCommit)
1108 : {
1109 1096240 : if (isCommit)
1110 : {
1111 1046376 : if (on_commit_launcher_wakeup)
1112 278 : ApplyLauncherWakeup();
1113 : }
1114 :
1115 1096240 : on_commit_launcher_wakeup = false;
1116 1096240 : }
1117 :
1118 : /*
1119 : * Request wakeup of the launcher on commit of the transaction.
1120 : *
1121 : * This is used to send launcher signal to stop sleeping and process the
1122 : * subscriptions when current transaction commits. Should be used when new
1123 : * tuple was added to the pg_subscription catalog.
1124 : */
1125 : void
1126 278 : ApplyLauncherWakeupAtCommit(void)
1127 : {
1128 278 : if (!on_commit_launcher_wakeup)
1129 278 : on_commit_launcher_wakeup = true;
1130 278 : }
1131 :
1132 : /*
1133 : * Wakeup the launcher immediately.
1134 : */
1135 : void
1136 1232 : ApplyLauncherWakeup(void)
1137 : {
1138 1232 : if (LogicalRepCtx->launcher_pid != 0)
1139 1212 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1140 1232 : }
1141 :
1142 : /*
1143 : * Main loop for the apply launcher process.
1144 : */
1145 : void
1146 818 : ApplyLauncherMain(Datum main_arg)
1147 : {
1148 818 : ereport(DEBUG1,
1149 : (errmsg_internal("logical replication launcher started")));
1150 :
1151 818 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1152 :
1153 : Assert(LogicalRepCtx->launcher_pid == 0);
1154 818 : LogicalRepCtx->launcher_pid = MyProcPid;
1155 :
1156 : /* Establish signal handlers. */
1157 818 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
1158 818 : pqsignal(SIGTERM, die);
1159 818 : BackgroundWorkerUnblockSignals();
1160 :
1161 : /*
1162 : * Establish connection to nailed catalogs (we only ever access
1163 : * pg_subscription).
1164 : */
1165 818 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1166 :
1167 : /*
1168 : * Acquire the conflict detection slot at startup to ensure it can be
1169 : * dropped if no longer needed after a restart.
1170 : */
1171 818 : acquire_conflict_slot_if_exists();
1172 :
1173 : /* Enter main loop */
1174 : for (;;)
1175 4504 : {
1176 : int rc;
1177 : List *sublist;
1178 : ListCell *lc;
1179 : MemoryContext subctx;
1180 : MemoryContext oldctx;
1181 5322 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1182 5322 : bool can_advance_xmin = true;
1183 5322 : bool retain_dead_tuples = false;
1184 5322 : TransactionId xmin = InvalidTransactionId;
1185 :
1186 5322 : CHECK_FOR_INTERRUPTS();
1187 :
1188 : /* Use temporary context to avoid leaking memory across cycles. */
1189 5318 : subctx = AllocSetContextCreate(TopMemoryContext,
1190 : "Logical Replication Launcher sublist",
1191 : ALLOCSET_DEFAULT_SIZES);
1192 5318 : oldctx = MemoryContextSwitchTo(subctx);
1193 :
1194 : /*
1195 : * Start any missing workers for enabled subscriptions.
1196 : *
1197 : * Also, during the iteration through all subscriptions, we compute
1198 : * the minimum XID required to protect deleted tuples for conflict
1199 : * detection if one of the subscription enables retain_dead_tuples
1200 : * option.
1201 : */
1202 5318 : sublist = get_subscription_list();
1203 6702 : foreach(lc, sublist)
1204 : {
1205 1384 : Subscription *sub = (Subscription *) lfirst(lc);
1206 : LogicalRepWorker *w;
1207 : TimestampTz last_start;
1208 : TimestampTz now;
1209 : long elapsed;
1210 :
1211 1384 : if (sub->retaindeadtuples)
1212 : {
1213 12 : retain_dead_tuples = true;
1214 :
1215 : /*
1216 : * Can't advance xmin of the slot unless all the subscriptions
1217 : * with retain_dead_tuples are enabled. This is required to
1218 : * ensure that we don't advance the xmin of
1219 : * CONFLICT_DETECTION_SLOT if one of the subscriptions is not
1220 : * enabled. Otherwise, we won't be able to detect conflicts
1221 : * reliably for such a subscription even though it has set the
1222 : * retain_dead_tuples option.
1223 : */
1224 12 : can_advance_xmin &= sub->enabled;
1225 :
1226 : /*
1227 : * Create a replication slot to retain information necessary
1228 : * for conflict detection such as dead tuples, commit
1229 : * timestamps, and origins.
1230 : *
1231 : * The slot is created before starting the apply worker to
1232 : * prevent it from unnecessarily maintaining its
1233 : * oldest_nonremovable_xid.
1234 : *
1235 : * The slot is created even for a disabled subscription to
1236 : * ensure that conflict-related information is available when
1237 : * applying remote changes that occurred before the
1238 : * subscription was enabled.
1239 : */
1240 12 : CreateConflictDetectionSlot();
1241 : }
1242 :
1243 1384 : if (!sub->enabled)
1244 62 : continue;
1245 :
1246 1322 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1247 1322 : w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1248 1322 : LWLockRelease(LogicalRepWorkerLock);
1249 :
1250 1322 : if (w != NULL)
1251 : {
1252 : /*
1253 : * Compute the minimum xmin required to protect dead tuples
1254 : * required for conflict detection among all running apply
1255 : * workers that enables retain_dead_tuples.
1256 : */
1257 862 : if (sub->retaindeadtuples && can_advance_xmin)
1258 8 : compute_min_nonremovable_xid(w, &xmin);
1259 :
1260 : /* worker is running already */
1261 862 : continue;
1262 : }
1263 :
1264 : /*
1265 : * Can't advance xmin of the slot unless all the workers
1266 : * corresponding to subscriptions with retain_dead_tuples are
1267 : * running, disabling the further computation of the minimum
1268 : * nonremovable xid.
1269 : */
1270 460 : if (sub->retaindeadtuples)
1271 2 : can_advance_xmin = false;
1272 :
1273 : /*
1274 : * If the worker is eligible to start now, launch it. Otherwise,
1275 : * adjust wait_time so that we'll wake up as soon as it can be
1276 : * started.
1277 : *
1278 : * Each subscription's apply worker can only be restarted once per
1279 : * wal_retrieve_retry_interval, so that errors do not cause us to
1280 : * repeatedly restart the worker as fast as possible. In cases
1281 : * where a restart is expected (e.g., subscription parameter
1282 : * changes), another process should remove the last-start entry
1283 : * for the subscription so that the worker can be restarted
1284 : * without waiting for wal_retrieve_retry_interval to elapse.
1285 : */
1286 460 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1287 460 : now = GetCurrentTimestamp();
1288 460 : if (last_start == 0 ||
1289 212 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1290 : {
1291 310 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
1292 310 : if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
1293 310 : sub->dbid, sub->oid, sub->name,
1294 : sub->owner, InvalidOid,
1295 : DSM_HANDLE_INVALID,
1296 310 : sub->retaindeadtuples))
1297 : {
1298 : /*
1299 : * We get here either if we failed to launch a worker
1300 : * (perhaps for resource-exhaustion reasons) or if we
1301 : * launched one but it immediately quit. Either way, it
1302 : * seems appropriate to try again after
1303 : * wal_retrieve_retry_interval.
1304 : */
1305 6 : wait_time = Min(wait_time,
1306 : wal_retrieve_retry_interval);
1307 : }
1308 : }
1309 : else
1310 : {
1311 150 : wait_time = Min(wait_time,
1312 : wal_retrieve_retry_interval - elapsed);
1313 : }
1314 : }
1315 :
1316 : /*
1317 : * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
1318 : * that requires us to retain dead tuples. Otherwise, if required,
1319 : * advance the slot's xmin to protect dead tuples required for the
1320 : * conflict detection.
1321 : */
1322 5318 : if (MyReplicationSlot)
1323 : {
1324 14 : if (!retain_dead_tuples)
1325 2 : ReplicationSlotDropAcquired();
1326 12 : else if (can_advance_xmin)
1327 8 : advance_conflict_slot_xmin(xmin);
1328 : }
1329 :
1330 : /* Switch back to original memory context. */
1331 5318 : MemoryContextSwitchTo(oldctx);
1332 : /* Clean the temporary memory. */
1333 5318 : MemoryContextDelete(subctx);
1334 :
1335 : /* Wait for more work. */
1336 5318 : rc = WaitLatch(MyLatch,
1337 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1338 : wait_time,
1339 : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1340 :
1341 5312 : if (rc & WL_LATCH_SET)
1342 : {
1343 5282 : ResetLatch(MyLatch);
1344 5282 : CHECK_FOR_INTERRUPTS();
1345 : }
1346 :
1347 4504 : if (ConfigReloadPending)
1348 : {
1349 72 : ConfigReloadPending = false;
1350 72 : ProcessConfigFile(PGC_SIGHUP);
1351 : }
1352 : }
1353 :
1354 : /* Not reachable */
1355 : }
1356 :
1357 : /*
1358 : * Determine the minimum non-removable transaction ID across all apply workers
1359 : * for subscriptions that have retain_dead_tuples enabled. Store the result
1360 : * in *xmin.
1361 : */
1362 : static void
1363 8 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
1364 : {
1365 : TransactionId nonremovable_xid;
1366 :
1367 : Assert(worker != NULL);
1368 :
1369 : /*
1370 : * The replication slot for conflict detection must be created before the
1371 : * worker starts.
1372 : */
1373 : Assert(MyReplicationSlot);
1374 :
1375 8 : SpinLockAcquire(&worker->relmutex);
1376 8 : nonremovable_xid = worker->oldest_nonremovable_xid;
1377 8 : SpinLockRelease(&worker->relmutex);
1378 :
1379 : Assert(TransactionIdIsValid(nonremovable_xid));
1380 :
1381 8 : if (!TransactionIdIsValid(*xmin) ||
1382 0 : TransactionIdPrecedes(nonremovable_xid, *xmin))
1383 8 : *xmin = nonremovable_xid;
1384 8 : }
1385 :
1386 : /*
1387 : * Acquire the replication slot used to retain information for conflict
1388 : * detection, if it exists.
1389 : *
1390 : * Return true if successfully acquired, otherwise return false.
1391 : */
1392 : static bool
1393 818 : acquire_conflict_slot_if_exists(void)
1394 : {
1395 818 : if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
1396 818 : return false;
1397 :
1398 0 : ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
1399 0 : return true;
1400 : }
1401 :
1402 : /*
1403 : * Advance the xmin the replication slot used to retain information required
1404 : * for conflict detection.
1405 : */
1406 : static void
1407 8 : advance_conflict_slot_xmin(TransactionId new_xmin)
1408 : {
1409 : Assert(MyReplicationSlot);
1410 : Assert(TransactionIdIsValid(new_xmin));
1411 : Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
1412 :
1413 : /* Return if the xmin value of the slot cannot be advanced */
1414 8 : if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
1415 6 : return;
1416 :
1417 2 : SpinLockAcquire(&MyReplicationSlot->mutex);
1418 2 : MyReplicationSlot->effective_xmin = new_xmin;
1419 2 : MyReplicationSlot->data.xmin = new_xmin;
1420 2 : SpinLockRelease(&MyReplicationSlot->mutex);
1421 :
1422 2 : elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
1423 :
1424 2 : ReplicationSlotMarkDirty();
1425 2 : ReplicationSlotsComputeRequiredXmin(false);
1426 :
1427 : /*
1428 : * Like PhysicalConfirmReceivedLocation(), do not save slot information
1429 : * each time. This is acceptable because all concurrent transactions on
1430 : * the publisher that require the data preceding the slot's xmin should
1431 : * have already been applied and flushed on the subscriber before the xmin
1432 : * is advanced. So, even if the slot's xmin regresses after a restart, it
1433 : * will be advanced again in the next cycle. Therefore, no data required
1434 : * for conflict detection will be prematurely removed.
1435 : */
1436 2 : return;
1437 : }
1438 :
1439 : /*
1440 : * Create and acquire the replication slot used to retain information for
1441 : * conflict detection, if not yet.
1442 : */
1443 : void
1444 14 : CreateConflictDetectionSlot(void)
1445 : {
1446 : TransactionId xmin_horizon;
1447 :
1448 : /* Exit early, if the replication slot is already created and acquired */
1449 14 : if (MyReplicationSlot)
1450 8 : return;
1451 :
1452 6 : ereport(LOG,
1453 : errmsg("creating replication conflict detection slot"));
1454 :
1455 6 : ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
1456 : false, false);
1457 :
1458 6 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1459 :
1460 6 : xmin_horizon = GetOldestSafeDecodingTransactionId(false);
1461 :
1462 6 : SpinLockAcquire(&MyReplicationSlot->mutex);
1463 6 : MyReplicationSlot->effective_xmin = xmin_horizon;
1464 6 : MyReplicationSlot->data.xmin = xmin_horizon;
1465 6 : SpinLockRelease(&MyReplicationSlot->mutex);
1466 :
1467 6 : ReplicationSlotsComputeRequiredXmin(true);
1468 :
1469 6 : LWLockRelease(ProcArrayLock);
1470 :
1471 : /* Write this slot to disk */
1472 6 : ReplicationSlotMarkDirty();
1473 6 : ReplicationSlotSave();
1474 : }
1475 :
1476 : /*
1477 : * Is current process the logical replication launcher?
1478 : */
1479 : bool
1480 4832 : IsLogicalLauncher(void)
1481 : {
1482 4832 : return LogicalRepCtx->launcher_pid == MyProcPid;
1483 : }
1484 :
1485 : /*
1486 : * Return the pid of the leader apply worker if the given pid is the pid of a
1487 : * parallel apply worker, otherwise, return InvalidPid.
1488 : */
1489 : pid_t
1490 1830 : GetLeaderApplyWorkerPid(pid_t pid)
1491 : {
1492 1830 : int leader_pid = InvalidPid;
1493 : int i;
1494 :
1495 1830 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1496 :
1497 9150 : for (i = 0; i < max_logical_replication_workers; i++)
1498 : {
1499 7320 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1500 :
1501 7320 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1502 : {
1503 0 : leader_pid = w->leader_pid;
1504 0 : break;
1505 : }
1506 : }
1507 :
1508 1830 : LWLockRelease(LogicalRepWorkerLock);
1509 :
1510 1830 : return leader_pid;
1511 : }
1512 :
1513 : /*
1514 : * Returns state of the subscriptions.
1515 : */
1516 : Datum
1517 2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1518 : {
1519 : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1520 2 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1521 : int i;
1522 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1523 :
1524 2 : InitMaterializedSRF(fcinfo, 0);
1525 :
1526 : /* Make sure we get consistent view of the workers. */
1527 2 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1528 :
1529 10 : for (i = 0; i < max_logical_replication_workers; i++)
1530 : {
1531 : /* for each row */
1532 8 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1533 8 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1534 : int worker_pid;
1535 : LogicalRepWorker worker;
1536 :
1537 8 : memcpy(&worker, &LogicalRepCtx->workers[i],
1538 : sizeof(LogicalRepWorker));
1539 8 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1540 4 : continue;
1541 :
1542 4 : if (OidIsValid(subid) && worker.subid != subid)
1543 0 : continue;
1544 :
1545 4 : worker_pid = worker.proc->pid;
1546 :
1547 4 : values[0] = ObjectIdGetDatum(worker.subid);
1548 4 : if (isTablesyncWorker(&worker))
1549 0 : values[1] = ObjectIdGetDatum(worker.relid);
1550 : else
1551 4 : nulls[1] = true;
1552 4 : values[2] = Int32GetDatum(worker_pid);
1553 :
1554 4 : if (isParallelApplyWorker(&worker))
1555 0 : values[3] = Int32GetDatum(worker.leader_pid);
1556 : else
1557 4 : nulls[3] = true;
1558 :
1559 4 : if (XLogRecPtrIsInvalid(worker.last_lsn))
1560 2 : nulls[4] = true;
1561 : else
1562 2 : values[4] = LSNGetDatum(worker.last_lsn);
1563 4 : if (worker.last_send_time == 0)
1564 0 : nulls[5] = true;
1565 : else
1566 4 : values[5] = TimestampTzGetDatum(worker.last_send_time);
1567 4 : if (worker.last_recv_time == 0)
1568 0 : nulls[6] = true;
1569 : else
1570 4 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
1571 4 : if (XLogRecPtrIsInvalid(worker.reply_lsn))
1572 2 : nulls[7] = true;
1573 : else
1574 2 : values[7] = LSNGetDatum(worker.reply_lsn);
1575 4 : if (worker.reply_time == 0)
1576 0 : nulls[8] = true;
1577 : else
1578 4 : values[8] = TimestampTzGetDatum(worker.reply_time);
1579 :
1580 4 : switch (worker.type)
1581 : {
1582 4 : case WORKERTYPE_APPLY:
1583 4 : values[9] = CStringGetTextDatum("apply");
1584 4 : break;
1585 0 : case WORKERTYPE_PARALLEL_APPLY:
1586 0 : values[9] = CStringGetTextDatum("parallel apply");
1587 0 : break;
1588 0 : case WORKERTYPE_TABLESYNC:
1589 0 : values[9] = CStringGetTextDatum("table synchronization");
1590 0 : break;
1591 0 : case WORKERTYPE_UNKNOWN:
1592 : /* Should never happen. */
1593 0 : elog(ERROR, "unknown worker type");
1594 : }
1595 :
1596 4 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1597 : values, nulls);
1598 :
1599 : /*
1600 : * If only a single subscription was requested, and we found it,
1601 : * break.
1602 : */
1603 4 : if (OidIsValid(subid))
1604 0 : break;
1605 : }
1606 :
1607 2 : LWLockRelease(LogicalRepWorkerLock);
1608 :
1609 2 : return (Datum) 0;
1610 : }
|