Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * launcher.c
3 : * PostgreSQL logical replication worker launcher process
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/launcher.c
9 : *
10 : * NOTES
11 : * This module contains the logical replication worker launcher which
12 : * uses the background worker infrastructure to start the logical
13 : * replication workers for every enabled subscription.
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 :
18 : #include "postgres.h"
19 :
20 : #include "access/heapam.h"
21 : #include "access/htup.h"
22 : #include "access/htup_details.h"
23 : #include "access/tableam.h"
24 : #include "access/xact.h"
25 : #include "catalog/pg_subscription.h"
26 : #include "catalog/pg_subscription_rel.h"
27 : #include "funcapi.h"
28 : #include "lib/dshash.h"
29 : #include "miscadmin.h"
30 : #include "pgstat.h"
31 : #include "postmaster/bgworker.h"
32 : #include "postmaster/interrupt.h"
33 : #include "replication/logicallauncher.h"
34 : #include "replication/origin.h"
35 : #include "replication/slot.h"
36 : #include "replication/walreceiver.h"
37 : #include "replication/worker_internal.h"
38 : #include "storage/ipc.h"
39 : #include "storage/proc.h"
40 : #include "storage/procarray.h"
41 : #include "tcop/tcopprot.h"
42 : #include "utils/builtins.h"
43 : #include "utils/memutils.h"
44 : #include "utils/pg_lsn.h"
45 : #include "utils/snapmgr.h"
46 : #include "utils/syscache.h"
47 :
48 : /* max sleep time between cycles (3min) */
49 : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
50 :
51 : /* GUC variables */
52 : int max_logical_replication_workers = 4;
53 : int max_sync_workers_per_subscription = 2;
54 : int max_parallel_apply_workers_per_subscription = 2;
55 :
56 : LogicalRepWorker *MyLogicalRepWorker = NULL;
57 :
58 : typedef struct LogicalRepCtxStruct
59 : {
60 : /* Supervisor process. */
61 : pid_t launcher_pid;
62 :
63 : /* Hash table holding last start times of subscriptions' apply workers. */
64 : dsa_handle last_start_dsa;
65 : dshash_table_handle last_start_dsh;
66 :
67 : /* Background workers. */
68 : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
69 : } LogicalRepCtxStruct;
70 :
71 : static LogicalRepCtxStruct *LogicalRepCtx;
72 :
73 : /* an entry in the last-start-times shared hash table */
74 : typedef struct LauncherLastStartTimesEntry
75 : {
76 : Oid subid; /* OID of logrep subscription (hash key) */
77 : TimestampTz last_start_time; /* last time its apply worker was started */
78 : } LauncherLastStartTimesEntry;
79 :
80 : /* parameters for the last-start-times shared hash table */
81 : static const dshash_parameters dsh_params = {
82 : sizeof(Oid),
83 : sizeof(LauncherLastStartTimesEntry),
84 : dshash_memcmp,
85 : dshash_memhash,
86 : dshash_memcpy,
87 : LWTRANCHE_LAUNCHER_HASH
88 : };
89 :
90 : static dsa_area *last_start_times_dsa = NULL;
91 : static dshash_table *last_start_times = NULL;
92 :
93 : static bool on_commit_launcher_wakeup = false;
94 :
95 :
96 : static void logicalrep_launcher_onexit(int code, Datum arg);
97 : static void logicalrep_worker_onexit(int code, Datum arg);
98 : static void logicalrep_worker_detach(void);
99 : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
100 : static int logicalrep_pa_worker_count(Oid subid);
101 : static void logicalrep_launcher_attach_dshmem(void);
102 : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
103 : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
104 : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
105 : static bool acquire_conflict_slot_if_exists(void);
106 : static void update_conflict_slot_xmin(TransactionId new_xmin);
107 : static void init_conflict_slot_xmin(void);
108 :
109 :
110 : /*
111 : * Load the list of subscriptions.
112 : *
113 : * Only the fields interesting for worker start/stop functions are filled for
114 : * each subscription.
115 : */
116 : static List *
117 5696 : get_subscription_list(void)
118 : {
119 5696 : List *res = NIL;
120 : Relation rel;
121 : TableScanDesc scan;
122 : HeapTuple tup;
123 : MemoryContext resultcxt;
124 :
125 : /* This is the context that we will allocate our output data in */
126 5696 : resultcxt = CurrentMemoryContext;
127 :
128 : /*
129 : * Start a transaction so we can access pg_subscription.
130 : */
131 5696 : StartTransactionCommand();
132 :
133 5696 : rel = table_open(SubscriptionRelationId, AccessShareLock);
134 5696 : scan = table_beginscan_catalog(rel, 0, NULL);
135 :
136 7528 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
137 : {
138 1832 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
139 : Subscription *sub;
140 : MemoryContext oldcxt;
141 :
142 : /*
143 : * Allocate our results in the caller's context, not the
144 : * transaction's. We do this inside the loop, and restore the original
145 : * context at the end, so that leaky things like heap_getnext() are
146 : * not called in a potentially long-lived context.
147 : */
148 1832 : oldcxt = MemoryContextSwitchTo(resultcxt);
149 :
150 1832 : sub = (Subscription *) palloc0(sizeof(Subscription));
151 1832 : sub->oid = subform->oid;
152 1832 : sub->dbid = subform->subdbid;
153 1832 : sub->owner = subform->subowner;
154 1832 : sub->enabled = subform->subenabled;
155 1832 : sub->name = pstrdup(NameStr(subform->subname));
156 1832 : sub->retaindeadtuples = subform->subretaindeadtuples;
157 1832 : sub->retentionactive = subform->subretentionactive;
158 : /* We don't fill fields we are not interested in. */
159 :
160 1832 : res = lappend(res, sub);
161 1832 : MemoryContextSwitchTo(oldcxt);
162 : }
163 :
164 5696 : table_endscan(scan);
165 5696 : table_close(rel, AccessShareLock);
166 :
167 5696 : CommitTransactionCommand();
168 :
169 5696 : return res;
170 : }
171 :
172 : /*
173 : * Wait for a background worker to start up and attach to the shmem context.
174 : *
175 : * This is only needed for cleaning up the shared memory in case the worker
176 : * fails to attach.
177 : *
178 : * Returns whether the attach was successful.
179 : */
180 : static bool
181 838 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
182 : uint16 generation,
183 : BackgroundWorkerHandle *handle)
184 : {
185 838 : bool result = false;
186 838 : bool dropped_latch = false;
187 :
188 : for (;;)
189 3208 : {
190 : BgwHandleStatus status;
191 : pid_t pid;
192 : int rc;
193 :
194 4046 : CHECK_FOR_INTERRUPTS();
195 :
196 4046 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197 :
198 : /* Worker either died or has started. Return false if died. */
199 4046 : if (!worker->in_use || worker->proc)
200 : {
201 836 : result = worker->in_use;
202 836 : LWLockRelease(LogicalRepWorkerLock);
203 836 : break;
204 : }
205 :
206 3210 : LWLockRelease(LogicalRepWorkerLock);
207 :
208 : /* Check if worker has died before attaching, and clean up after it. */
209 3210 : status = GetBackgroundWorkerPid(handle, &pid);
210 :
211 3210 : if (status == BGWH_STOPPED)
212 : {
213 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
214 : /* Ensure that this was indeed the worker we waited for. */
215 0 : if (generation == worker->generation)
216 0 : logicalrep_worker_cleanup(worker);
217 0 : LWLockRelease(LogicalRepWorkerLock);
218 0 : break; /* result is already false */
219 : }
220 :
221 : /*
222 : * We need timeout because we generally don't get notified via latch
223 : * about the worker attach. But we don't expect to have to wait long.
224 : */
225 3210 : rc = WaitLatch(MyLatch,
226 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
227 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
228 :
229 3210 : if (rc & WL_LATCH_SET)
230 : {
231 1014 : ResetLatch(MyLatch);
232 1014 : CHECK_FOR_INTERRUPTS();
233 1012 : dropped_latch = true;
234 : }
235 : }
236 :
237 : /*
238 : * If we had to clear a latch event in order to wait, be sure to restore
239 : * it before exiting. Otherwise caller may miss events.
240 : */
241 836 : if (dropped_latch)
242 836 : SetLatch(MyLatch);
243 :
244 836 : return result;
245 : }
246 :
247 : /*
248 : * Walks the workers array and searches for one that matches given
249 : * subscription id and relid.
250 : *
251 : * We are only interested in the leader apply worker or table sync worker.
252 : */
253 : LogicalRepWorker *
254 6138 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
255 : {
256 : int i;
257 6138 : LogicalRepWorker *res = NULL;
258 :
259 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
260 :
261 : /* Search for attached worker for a given subscription id. */
262 18514 : for (i = 0; i < max_logical_replication_workers; i++)
263 : {
264 16166 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
265 :
266 : /* Skip parallel apply workers. */
267 16166 : if (isParallelApplyWorker(w))
268 0 : continue;
269 :
270 16166 : if (w->in_use && w->subid == subid && w->relid == relid &&
271 3790 : (!only_running || w->proc))
272 : {
273 3790 : res = w;
274 3790 : break;
275 : }
276 : }
277 :
278 6138 : return res;
279 : }
280 :
281 : /*
282 : * Similar to logicalrep_worker_find(), but returns a list of all workers for
283 : * the subscription, instead of just one.
284 : */
285 : List *
286 1336 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
287 : {
288 : int i;
289 1336 : List *res = NIL;
290 :
291 1336 : if (acquire_lock)
292 236 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
293 :
294 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
295 :
296 : /* Search for attached worker for a given subscription id. */
297 6936 : for (i = 0; i < max_logical_replication_workers; i++)
298 : {
299 5600 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
300 :
301 5600 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
302 958 : res = lappend(res, w);
303 : }
304 :
305 1336 : if (acquire_lock)
306 236 : LWLockRelease(LogicalRepWorkerLock);
307 :
308 1336 : return res;
309 : }
310 :
311 : /*
312 : * Start new logical replication background worker, if possible.
313 : *
314 : * Returns true on success, false on failure.
315 : */
316 : bool
317 846 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
318 : Oid dbid, Oid subid, const char *subname, Oid userid,
319 : Oid relid, dsm_handle subworker_dsm,
320 : bool retain_dead_tuples)
321 : {
322 : BackgroundWorker bgw;
323 : BackgroundWorkerHandle *bgw_handle;
324 : uint16 generation;
325 : int i;
326 846 : int slot = 0;
327 846 : LogicalRepWorker *worker = NULL;
328 : int nsyncworkers;
329 : int nparallelapplyworkers;
330 : TimestampTz now;
331 846 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
332 846 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
333 :
334 : /*----------
335 : * Sanity checks:
336 : * - must be valid worker type
337 : * - tablesync workers are only ones to have relid
338 : * - parallel apply worker is the only kind of subworker
339 : * - The replication slot used in conflict detection is created when
340 : * retain_dead_tuples is enabled
341 : */
342 : Assert(wtype != WORKERTYPE_UNKNOWN);
343 : Assert(is_tablesync_worker == OidIsValid(relid));
344 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
345 : Assert(!retain_dead_tuples || MyReplicationSlot);
346 :
347 846 : ereport(DEBUG1,
348 : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
349 : subname)));
350 :
351 : /* Report this after the initial starting message for consistency. */
352 846 : if (max_active_replication_origins == 0)
353 0 : ereport(ERROR,
354 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
355 : errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
356 :
357 : /*
358 : * We need to do the modification of the shared memory under lock so that
359 : * we have consistent view.
360 : */
361 846 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
362 :
363 846 : retry:
364 : /* Find unused worker slot. */
365 1458 : for (i = 0; i < max_logical_replication_workers; i++)
366 : {
367 1458 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
368 :
369 1458 : if (!w->in_use)
370 : {
371 846 : worker = w;
372 846 : slot = i;
373 846 : break;
374 : }
375 : }
376 :
377 846 : nsyncworkers = logicalrep_sync_worker_count(subid);
378 :
379 846 : now = GetCurrentTimestamp();
380 :
381 : /*
382 : * If we didn't find a free slot, try to do garbage collection. The
383 : * reason we do this is because if some worker failed to start up and its
384 : * parent has crashed while waiting, the in_use state was never cleared.
385 : */
386 846 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
387 : {
388 0 : bool did_cleanup = false;
389 :
390 0 : for (i = 0; i < max_logical_replication_workers; i++)
391 : {
392 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
393 :
394 : /*
395 : * If the worker was marked in use but didn't manage to attach in
396 : * time, clean it up.
397 : */
398 0 : if (w->in_use && !w->proc &&
399 0 : TimestampDifferenceExceeds(w->launch_time, now,
400 : wal_receiver_timeout))
401 : {
402 0 : elog(WARNING,
403 : "logical replication worker for subscription %u took too long to start; canceled",
404 : w->subid);
405 :
406 0 : logicalrep_worker_cleanup(w);
407 0 : did_cleanup = true;
408 : }
409 : }
410 :
411 0 : if (did_cleanup)
412 0 : goto retry;
413 : }
414 :
415 : /*
416 : * We don't allow to invoke more sync workers once we have reached the
417 : * sync worker limit per subscription. So, just return silently as we
418 : * might get here because of an otherwise harmless race condition.
419 : */
420 846 : if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
421 : {
422 0 : LWLockRelease(LogicalRepWorkerLock);
423 0 : return false;
424 : }
425 :
426 846 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
427 :
428 : /*
429 : * Return false if the number of parallel apply workers reached the limit
430 : * per subscription.
431 : */
432 846 : if (is_parallel_apply_worker &&
433 20 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
434 : {
435 0 : LWLockRelease(LogicalRepWorkerLock);
436 0 : return false;
437 : }
438 :
439 : /*
440 : * However if there are no more free worker slots, inform user about it
441 : * before exiting.
442 : */
443 846 : if (worker == NULL)
444 : {
445 0 : LWLockRelease(LogicalRepWorkerLock);
446 0 : ereport(WARNING,
447 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
448 : errmsg("out of logical replication worker slots"),
449 : errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
450 0 : return false;
451 : }
452 :
453 : /* Prepare the worker slot. */
454 846 : worker->type = wtype;
455 846 : worker->launch_time = now;
456 846 : worker->in_use = true;
457 846 : worker->generation++;
458 846 : worker->proc = NULL;
459 846 : worker->dbid = dbid;
460 846 : worker->userid = userid;
461 846 : worker->subid = subid;
462 846 : worker->relid = relid;
463 846 : worker->relstate = SUBREL_STATE_UNKNOWN;
464 846 : worker->relstate_lsn = InvalidXLogRecPtr;
465 846 : worker->stream_fileset = NULL;
466 846 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
467 846 : worker->parallel_apply = is_parallel_apply_worker;
468 846 : worker->oldest_nonremovable_xid = retain_dead_tuples
469 4 : ? MyReplicationSlot->data.xmin
470 846 : : InvalidTransactionId;
471 846 : worker->last_lsn = InvalidXLogRecPtr;
472 846 : TIMESTAMP_NOBEGIN(worker->last_send_time);
473 846 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
474 846 : worker->reply_lsn = InvalidXLogRecPtr;
475 846 : TIMESTAMP_NOBEGIN(worker->reply_time);
476 :
477 : /* Before releasing lock, remember generation for future identification. */
478 846 : generation = worker->generation;
479 :
480 846 : LWLockRelease(LogicalRepWorkerLock);
481 :
482 : /* Register the new dynamic worker. */
483 846 : memset(&bgw, 0, sizeof(bgw));
484 846 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
485 : BGWORKER_BACKEND_DATABASE_CONNECTION;
486 846 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
487 846 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
488 :
489 846 : switch (worker->type)
490 : {
491 430 : case WORKERTYPE_APPLY:
492 430 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
493 430 : snprintf(bgw.bgw_name, BGW_MAXLEN,
494 : "logical replication apply worker for subscription %u",
495 : subid);
496 430 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
497 430 : break;
498 :
499 20 : case WORKERTYPE_PARALLEL_APPLY:
500 20 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
501 20 : snprintf(bgw.bgw_name, BGW_MAXLEN,
502 : "logical replication parallel apply worker for subscription %u",
503 : subid);
504 20 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
505 :
506 20 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
507 20 : break;
508 :
509 396 : case WORKERTYPE_TABLESYNC:
510 396 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
511 396 : snprintf(bgw.bgw_name, BGW_MAXLEN,
512 : "logical replication tablesync worker for subscription %u sync %u",
513 : subid,
514 : relid);
515 396 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
516 396 : break;
517 :
518 0 : case WORKERTYPE_UNKNOWN:
519 : /* Should never happen. */
520 0 : elog(ERROR, "unknown worker type");
521 : }
522 :
523 846 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
524 846 : bgw.bgw_notify_pid = MyProcPid;
525 846 : bgw.bgw_main_arg = Int32GetDatum(slot);
526 :
527 846 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
528 : {
529 : /* Failed to start worker, so clean up the worker slot. */
530 8 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
531 : Assert(generation == worker->generation);
532 8 : logicalrep_worker_cleanup(worker);
533 8 : LWLockRelease(LogicalRepWorkerLock);
534 :
535 8 : ereport(WARNING,
536 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
537 : errmsg("out of background worker slots"),
538 : errhint("You might need to increase \"%s\".", "max_worker_processes")));
539 8 : return false;
540 : }
541 :
542 : /* Now wait until it attaches. */
543 838 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
544 : }
545 :
546 : /*
547 : * Internal function to stop the worker and wait until it detaches from the
548 : * slot.
549 : */
550 : static void
551 162 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
552 : {
553 : uint16 generation;
554 :
555 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
556 :
557 : /*
558 : * Remember which generation was our worker so we can check if what we see
559 : * is still the same one.
560 : */
561 162 : generation = worker->generation;
562 :
563 : /*
564 : * If we found a worker but it does not have proc set then it is still
565 : * starting up; wait for it to finish starting and then kill it.
566 : */
567 166 : while (worker->in_use && !worker->proc)
568 : {
569 : int rc;
570 :
571 10 : LWLockRelease(LogicalRepWorkerLock);
572 :
573 : /* Wait a bit --- we don't expect to have to wait long. */
574 10 : rc = WaitLatch(MyLatch,
575 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
576 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
577 :
578 10 : if (rc & WL_LATCH_SET)
579 : {
580 2 : ResetLatch(MyLatch);
581 2 : CHECK_FOR_INTERRUPTS();
582 : }
583 :
584 : /* Recheck worker status. */
585 10 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
586 :
587 : /*
588 : * Check whether the worker slot is no longer used, which would mean
589 : * that the worker has exited, or whether the worker generation is
590 : * different, meaning that a different worker has taken the slot.
591 : */
592 10 : if (!worker->in_use || worker->generation != generation)
593 0 : return;
594 :
595 : /* Worker has assigned proc, so it has started. */
596 10 : if (worker->proc)
597 6 : break;
598 : }
599 :
600 : /* Now terminate the worker ... */
601 162 : kill(worker->proc->pid, signo);
602 :
603 : /* ... and wait for it to die. */
604 : for (;;)
605 216 : {
606 : int rc;
607 :
608 : /* is it gone? */
609 378 : if (!worker->proc || worker->generation != generation)
610 : break;
611 :
612 216 : LWLockRelease(LogicalRepWorkerLock);
613 :
614 : /* Wait a bit --- we don't expect to have to wait long. */
615 216 : rc = WaitLatch(MyLatch,
616 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
617 : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
618 :
619 216 : if (rc & WL_LATCH_SET)
620 : {
621 96 : ResetLatch(MyLatch);
622 96 : CHECK_FOR_INTERRUPTS();
623 : }
624 :
625 216 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
626 : }
627 : }
628 :
629 : /*
630 : * Stop the logical replication worker for subid/relid, if any.
631 : */
632 : void
633 188 : logicalrep_worker_stop(Oid subid, Oid relid)
634 : {
635 : LogicalRepWorker *worker;
636 :
637 188 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
638 :
639 188 : worker = logicalrep_worker_find(subid, relid, false);
640 :
641 188 : if (worker)
642 : {
643 : Assert(!isParallelApplyWorker(worker));
644 146 : logicalrep_worker_stop_internal(worker, SIGTERM);
645 : }
646 :
647 188 : LWLockRelease(LogicalRepWorkerLock);
648 188 : }
649 :
650 : /*
651 : * Stop the given logical replication parallel apply worker.
652 : *
653 : * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
654 : * worker so that the worker exits cleanly.
655 : */
656 : void
657 10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
658 : {
659 : int slot_no;
660 : uint16 generation;
661 : LogicalRepWorker *worker;
662 :
663 10 : SpinLockAcquire(&winfo->shared->mutex);
664 10 : generation = winfo->shared->logicalrep_worker_generation;
665 10 : slot_no = winfo->shared->logicalrep_worker_slot_no;
666 10 : SpinLockRelease(&winfo->shared->mutex);
667 :
668 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
669 :
670 : /*
671 : * Detach from the error_mq_handle for the parallel apply worker before
672 : * stopping it. This prevents the leader apply worker from trying to
673 : * receive the message from the error queue that might already be detached
674 : * by the parallel apply worker.
675 : */
676 10 : if (winfo->error_mq_handle)
677 : {
678 10 : shm_mq_detach(winfo->error_mq_handle);
679 10 : winfo->error_mq_handle = NULL;
680 : }
681 :
682 10 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
683 :
684 10 : worker = &LogicalRepCtx->workers[slot_no];
685 : Assert(isParallelApplyWorker(worker));
686 :
687 : /*
688 : * Only stop the worker if the generation matches and the worker is alive.
689 : */
690 10 : if (worker->generation == generation && worker->proc)
691 10 : logicalrep_worker_stop_internal(worker, SIGINT);
692 :
693 10 : LWLockRelease(LogicalRepWorkerLock);
694 10 : }
695 :
696 : /*
697 : * Wake up (using latch) any logical replication worker for specified sub/rel.
698 : */
699 : void
700 420 : logicalrep_worker_wakeup(Oid subid, Oid relid)
701 : {
702 : LogicalRepWorker *worker;
703 :
704 420 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
705 :
706 420 : worker = logicalrep_worker_find(subid, relid, true);
707 :
708 420 : if (worker)
709 420 : logicalrep_worker_wakeup_ptr(worker);
710 :
711 420 : LWLockRelease(LogicalRepWorkerLock);
712 420 : }
713 :
714 : /*
715 : * Wake up (using latch) the specified logical replication worker.
716 : *
717 : * Caller must hold lock, else worker->proc could change under us.
718 : */
719 : void
720 1280 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
721 : {
722 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
723 :
724 1280 : SetLatch(&worker->proc->procLatch);
725 1280 : }
726 :
727 : /*
728 : * Attach to a slot.
729 : */
730 : void
731 1084 : logicalrep_worker_attach(int slot)
732 : {
733 : /* Block concurrent access. */
734 1084 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
735 :
736 : Assert(slot >= 0 && slot < max_logical_replication_workers);
737 1084 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
738 :
739 1084 : if (!MyLogicalRepWorker->in_use)
740 : {
741 0 : LWLockRelease(LogicalRepWorkerLock);
742 0 : ereport(ERROR,
743 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
744 : errmsg("logical replication worker slot %d is empty, cannot attach",
745 : slot)));
746 : }
747 :
748 1084 : if (MyLogicalRepWorker->proc)
749 : {
750 0 : LWLockRelease(LogicalRepWorkerLock);
751 0 : ereport(ERROR,
752 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
753 : errmsg("logical replication worker slot %d is already used by "
754 : "another worker, cannot attach", slot)));
755 : }
756 :
757 1084 : MyLogicalRepWorker->proc = MyProc;
758 1084 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
759 :
760 1084 : LWLockRelease(LogicalRepWorkerLock);
761 1084 : }
762 :
763 : /*
764 : * Stop the parallel apply workers if any, and detach the leader apply worker
765 : * (cleans up the worker info).
766 : */
767 : static void
768 1084 : logicalrep_worker_detach(void)
769 : {
770 : /* Stop the parallel apply workers. */
771 1084 : if (am_leader_apply_worker())
772 : {
773 : List *workers;
774 : ListCell *lc;
775 :
776 : /*
777 : * Detach from the error_mq_handle for all parallel apply workers
778 : * before terminating them. This prevents the leader apply worker from
779 : * receiving the worker termination message and sending it to logs
780 : * when the same is already done by the parallel worker.
781 : */
782 672 : pa_detach_all_error_mq();
783 :
784 672 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
785 :
786 672 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
787 1352 : foreach(lc, workers)
788 : {
789 680 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
790 :
791 680 : if (isParallelApplyWorker(w))
792 6 : logicalrep_worker_stop_internal(w, SIGTERM);
793 : }
794 :
795 672 : LWLockRelease(LogicalRepWorkerLock);
796 :
797 672 : list_free(workers);
798 : }
799 :
800 : /* Block concurrent access. */
801 1084 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
802 :
803 1084 : logicalrep_worker_cleanup(MyLogicalRepWorker);
804 :
805 1084 : LWLockRelease(LogicalRepWorkerLock);
806 1084 : }
807 :
808 : /*
809 : * Clean up worker info.
810 : */
811 : static void
812 1092 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
813 : {
814 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
815 :
816 1092 : worker->type = WORKERTYPE_UNKNOWN;
817 1092 : worker->in_use = false;
818 1092 : worker->proc = NULL;
819 1092 : worker->dbid = InvalidOid;
820 1092 : worker->userid = InvalidOid;
821 1092 : worker->subid = InvalidOid;
822 1092 : worker->relid = InvalidOid;
823 1092 : worker->leader_pid = InvalidPid;
824 1092 : worker->parallel_apply = false;
825 1092 : }
826 :
827 : /*
828 : * Cleanup function for logical replication launcher.
829 : *
830 : * Called on logical replication launcher exit.
831 : */
832 : static void
833 844 : logicalrep_launcher_onexit(int code, Datum arg)
834 : {
835 844 : LogicalRepCtx->launcher_pid = 0;
836 844 : }
837 :
838 : /*
839 : * Cleanup function.
840 : *
841 : * Called on logical replication worker exit.
842 : */
843 : static void
844 1084 : logicalrep_worker_onexit(int code, Datum arg)
845 : {
846 : /* Disconnect gracefully from the remote side. */
847 1084 : if (LogRepWorkerWalRcvConn)
848 838 : walrcv_disconnect(LogRepWorkerWalRcvConn);
849 :
850 1084 : logicalrep_worker_detach();
851 :
852 : /* Cleanup fileset used for streaming transactions. */
853 1084 : if (MyLogicalRepWorker->stream_fileset != NULL)
854 28 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
855 :
856 : /*
857 : * Session level locks may be acquired outside of a transaction in
858 : * parallel apply mode and will not be released when the worker
859 : * terminates, so manually release all locks before the worker exits.
860 : *
861 : * The locks will be acquired once the worker is initialized.
862 : */
863 1084 : if (!InitializingApplyWorker)
864 958 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
865 :
866 1084 : ApplyLauncherWakeup();
867 1084 : }
868 :
869 : /*
870 : * Count the number of registered (not necessarily running) sync workers
871 : * for a subscription.
872 : */
873 : int
874 2484 : logicalrep_sync_worker_count(Oid subid)
875 : {
876 : int i;
877 2484 : int res = 0;
878 :
879 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
880 :
881 : /* Search for attached worker for a given subscription id. */
882 12824 : for (i = 0; i < max_logical_replication_workers; i++)
883 : {
884 10340 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
885 :
886 10340 : if (isTablesyncWorker(w) && w->subid == subid)
887 2640 : res++;
888 : }
889 :
890 2484 : return res;
891 : }
892 :
893 : /*
894 : * Count the number of registered (but not necessarily running) parallel apply
895 : * workers for a subscription.
896 : */
897 : static int
898 846 : logicalrep_pa_worker_count(Oid subid)
899 : {
900 : int i;
901 846 : int res = 0;
902 :
903 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
904 :
905 : /*
906 : * Scan all attached parallel apply workers, only counting those which
907 : * have the given subscription id.
908 : */
909 4478 : for (i = 0; i < max_logical_replication_workers; i++)
910 : {
911 3632 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
912 :
913 3632 : if (isParallelApplyWorker(w) && w->subid == subid)
914 4 : res++;
915 : }
916 :
917 846 : return res;
918 : }
919 :
920 : /*
921 : * ApplyLauncherShmemSize
922 : * Compute space needed for replication launcher shared memory
923 : */
924 : Size
925 8396 : ApplyLauncherShmemSize(void)
926 : {
927 : Size size;
928 :
929 : /*
930 : * Need the fixed struct and the array of LogicalRepWorker.
931 : */
932 8396 : size = sizeof(LogicalRepCtxStruct);
933 8396 : size = MAXALIGN(size);
934 8396 : size = add_size(size, mul_size(max_logical_replication_workers,
935 : sizeof(LogicalRepWorker)));
936 8396 : return size;
937 : }
938 :
939 : /*
940 : * ApplyLauncherRegister
941 : * Register a background worker running the logical replication launcher.
942 : */
943 : void
944 1740 : ApplyLauncherRegister(void)
945 : {
946 : BackgroundWorker bgw;
947 :
948 : /*
949 : * The logical replication launcher is disabled during binary upgrades, to
950 : * prevent logical replication workers from running on the source cluster.
951 : * That could cause replication origins to move forward after having been
952 : * copied to the target cluster, potentially creating conflicts with the
953 : * copied data files.
954 : */
955 1740 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
956 106 : return;
957 :
958 1634 : memset(&bgw, 0, sizeof(bgw));
959 1634 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
960 : BGWORKER_BACKEND_DATABASE_CONNECTION;
961 1634 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
962 1634 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
963 1634 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
964 1634 : snprintf(bgw.bgw_name, BGW_MAXLEN,
965 : "logical replication launcher");
966 1634 : snprintf(bgw.bgw_type, BGW_MAXLEN,
967 : "logical replication launcher");
968 1634 : bgw.bgw_restart_time = 5;
969 1634 : bgw.bgw_notify_pid = 0;
970 1634 : bgw.bgw_main_arg = (Datum) 0;
971 :
972 1634 : RegisterBackgroundWorker(&bgw);
973 : }
974 :
975 : /*
976 : * ApplyLauncherShmemInit
977 : * Allocate and initialize replication launcher shared memory
978 : */
979 : void
980 2174 : ApplyLauncherShmemInit(void)
981 : {
982 : bool found;
983 :
984 2174 : LogicalRepCtx = (LogicalRepCtxStruct *)
985 2174 : ShmemInitStruct("Logical Replication Launcher Data",
986 : ApplyLauncherShmemSize(),
987 : &found);
988 :
989 2174 : if (!found)
990 : {
991 : int slot;
992 :
993 2174 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
994 :
995 2174 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
996 2174 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
997 :
998 : /* Initialize memory and spin locks for each worker slot. */
999 10796 : for (slot = 0; slot < max_logical_replication_workers; slot++)
1000 : {
1001 8622 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
1002 :
1003 8622 : memset(worker, 0, sizeof(LogicalRepWorker));
1004 8622 : SpinLockInit(&worker->relmutex);
1005 : }
1006 : }
1007 2174 : }
1008 :
1009 : /*
1010 : * Initialize or attach to the dynamic shared hash table that stores the
1011 : * last-start times, if not already done.
1012 : * This must be called before accessing the table.
1013 : */
1014 : static void
1015 1554 : logicalrep_launcher_attach_dshmem(void)
1016 : {
1017 : MemoryContext oldcontext;
1018 :
1019 : /* Quick exit if we already did this. */
1020 1554 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
1021 1444 : last_start_times != NULL)
1022 1066 : return;
1023 :
1024 : /* Otherwise, use a lock to ensure only one process creates the table. */
1025 488 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1026 :
1027 : /* Be sure any local memory allocated by DSA routines is persistent. */
1028 488 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1029 :
1030 488 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1031 : {
1032 : /* Initialize dynamic shared hash table for last-start times. */
1033 110 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1034 110 : dsa_pin(last_start_times_dsa);
1035 110 : dsa_pin_mapping(last_start_times_dsa);
1036 110 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1037 :
1038 : /* Store handles in shared memory for other backends to use. */
1039 110 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1040 110 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1041 : }
1042 378 : else if (!last_start_times)
1043 : {
1044 : /* Attach to existing dynamic shared hash table. */
1045 378 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1046 378 : dsa_pin_mapping(last_start_times_dsa);
1047 378 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
1048 378 : LogicalRepCtx->last_start_dsh, NULL);
1049 : }
1050 :
1051 488 : MemoryContextSwitchTo(oldcontext);
1052 488 : LWLockRelease(LogicalRepWorkerLock);
1053 : }
1054 :
1055 : /*
1056 : * Set the last-start time for the subscription.
1057 : */
1058 : static void
1059 430 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1060 : {
1061 : LauncherLastStartTimesEntry *entry;
1062 : bool found;
1063 :
1064 430 : logicalrep_launcher_attach_dshmem();
1065 :
1066 430 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1067 430 : entry->last_start_time = start_time;
1068 430 : dshash_release_lock(last_start_times, entry);
1069 430 : }
1070 :
1071 : /*
1072 : * Return the last-start time for the subscription, or 0 if there isn't one.
1073 : */
1074 : static TimestampTz
1075 668 : ApplyLauncherGetWorkerStartTime(Oid subid)
1076 : {
1077 : LauncherLastStartTimesEntry *entry;
1078 : TimestampTz ret;
1079 :
1080 668 : logicalrep_launcher_attach_dshmem();
1081 :
1082 668 : entry = dshash_find(last_start_times, &subid, false);
1083 668 : if (entry == NULL)
1084 264 : return 0;
1085 :
1086 404 : ret = entry->last_start_time;
1087 404 : dshash_release_lock(last_start_times, entry);
1088 :
1089 404 : return ret;
1090 : }
1091 :
1092 : /*
1093 : * Remove the last-start-time entry for the subscription, if one exists.
1094 : *
1095 : * This has two use-cases: to remove the entry related to a subscription
1096 : * that's been deleted or disabled (just to avoid leaking shared memory),
1097 : * and to allow immediate restart of an apply worker that has exited
1098 : * due to subscription parameter changes.
1099 : */
1100 : void
1101 456 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1102 : {
1103 456 : logicalrep_launcher_attach_dshmem();
1104 :
1105 456 : (void) dshash_delete_key(last_start_times, &subid);
1106 456 : }
1107 :
1108 : /*
1109 : * Wakeup the launcher on commit if requested.
1110 : */
1111 : void
1112 1021192 : AtEOXact_ApplyLauncher(bool isCommit)
1113 : {
1114 1021192 : if (isCommit)
1115 : {
1116 970794 : if (on_commit_launcher_wakeup)
1117 286 : ApplyLauncherWakeup();
1118 : }
1119 :
1120 1021192 : on_commit_launcher_wakeup = false;
1121 1021192 : }
1122 :
1123 : /*
1124 : * Request wakeup of the launcher on commit of the transaction.
1125 : *
1126 : * This is used to send launcher signal to stop sleeping and process the
1127 : * subscriptions when current transaction commits. Should be used when new
1128 : * tuple was added to the pg_subscription catalog.
1129 : */
1130 : void
1131 288 : ApplyLauncherWakeupAtCommit(void)
1132 : {
1133 288 : if (!on_commit_launcher_wakeup)
1134 286 : on_commit_launcher_wakeup = true;
1135 288 : }
1136 :
1137 : /*
1138 : * Wakeup the launcher immediately.
1139 : */
1140 : void
1141 1448 : ApplyLauncherWakeup(void)
1142 : {
1143 1448 : if (LogicalRepCtx->launcher_pid != 0)
1144 1430 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1145 1448 : }
1146 :
1147 : /*
1148 : * Main loop for the apply launcher process.
1149 : */
1150 : void
1151 844 : ApplyLauncherMain(Datum main_arg)
1152 : {
1153 844 : ereport(DEBUG1,
1154 : (errmsg_internal("logical replication launcher started")));
1155 :
1156 844 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1157 :
1158 : Assert(LogicalRepCtx->launcher_pid == 0);
1159 844 : LogicalRepCtx->launcher_pid = MyProcPid;
1160 :
1161 : /* Establish signal handlers. */
1162 844 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
1163 844 : pqsignal(SIGTERM, die);
1164 844 : BackgroundWorkerUnblockSignals();
1165 :
1166 : /*
1167 : * Establish connection to nailed catalogs (we only ever access
1168 : * pg_subscription).
1169 : */
1170 844 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1171 :
1172 : /*
1173 : * Acquire the conflict detection slot at startup to ensure it can be
1174 : * dropped if no longer needed after a restart.
1175 : */
1176 844 : acquire_conflict_slot_if_exists();
1177 :
1178 : /* Enter main loop */
1179 : for (;;)
1180 4856 : {
1181 : int rc;
1182 : List *sublist;
1183 : ListCell *lc;
1184 : MemoryContext subctx;
1185 : MemoryContext oldctx;
1186 5700 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1187 5700 : bool can_update_xmin = true;
1188 5700 : bool retain_dead_tuples = false;
1189 5700 : TransactionId xmin = InvalidTransactionId;
1190 :
1191 5700 : CHECK_FOR_INTERRUPTS();
1192 :
1193 : /* Use temporary context to avoid leaking memory across cycles. */
1194 5696 : subctx = AllocSetContextCreate(TopMemoryContext,
1195 : "Logical Replication Launcher sublist",
1196 : ALLOCSET_DEFAULT_SIZES);
1197 5696 : oldctx = MemoryContextSwitchTo(subctx);
1198 :
1199 : /*
1200 : * Start any missing workers for enabled subscriptions.
1201 : *
1202 : * Also, during the iteration through all subscriptions, we compute
1203 : * the minimum XID required to protect deleted tuples for conflict
1204 : * detection if one of the subscription enables retain_dead_tuples
1205 : * option.
1206 : */
1207 5696 : sublist = get_subscription_list();
1208 7528 : foreach(lc, sublist)
1209 : {
1210 1832 : Subscription *sub = (Subscription *) lfirst(lc);
1211 : LogicalRepWorker *w;
1212 : TimestampTz last_start;
1213 : TimestampTz now;
1214 : long elapsed;
1215 :
1216 1832 : if (sub->retaindeadtuples)
1217 : {
1218 136 : retain_dead_tuples = true;
1219 :
1220 : /*
1221 : * Create a replication slot to retain information necessary
1222 : * for conflict detection such as dead tuples, commit
1223 : * timestamps, and origins.
1224 : *
1225 : * The slot is created before starting the apply worker to
1226 : * prevent it from unnecessarily maintaining its
1227 : * oldest_nonremovable_xid.
1228 : *
1229 : * The slot is created even for a disabled subscription to
1230 : * ensure that conflict-related information is available when
1231 : * applying remote changes that occurred before the
1232 : * subscription was enabled.
1233 : */
1234 136 : CreateConflictDetectionSlot();
1235 :
1236 136 : if (sub->retentionactive)
1237 : {
1238 : /*
1239 : * Can't advance xmin of the slot unless all the
1240 : * subscriptions actively retaining dead tuples are
1241 : * enabled. This is required to ensure that we don't
1242 : * advance the xmin of CONFLICT_DETECTION_SLOT if one of
1243 : * the subscriptions is not enabled. Otherwise, we won't
1244 : * be able to detect conflicts reliably for such a
1245 : * subscription even though it has set the
1246 : * retain_dead_tuples option.
1247 : */
1248 136 : can_update_xmin &= sub->enabled;
1249 :
1250 : /*
1251 : * Initialize the slot once the subscription activiates
1252 : * retention.
1253 : */
1254 136 : if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
1255 0 : init_conflict_slot_xmin();
1256 : }
1257 : }
1258 :
1259 1832 : if (!sub->enabled)
1260 76 : continue;
1261 :
1262 1756 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1263 1756 : w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1264 1756 : LWLockRelease(LogicalRepWorkerLock);
1265 :
1266 1756 : if (w != NULL)
1267 : {
1268 : /*
1269 : * Compute the minimum xmin required to protect dead tuples
1270 : * required for conflict detection among all running apply
1271 : * workers.
1272 : */
1273 1088 : if (sub->retaindeadtuples &&
1274 128 : sub->retentionactive &&
1275 : can_update_xmin)
1276 128 : compute_min_nonremovable_xid(w, &xmin);
1277 :
1278 : /* worker is running already */
1279 1088 : continue;
1280 : }
1281 :
1282 : /*
1283 : * Can't advance xmin of the slot unless all the workers
1284 : * corresponding to subscriptions actively retaining dead tuples
1285 : * are running, disabling the further computation of the minimum
1286 : * nonremovable xid.
1287 : */
1288 668 : if (sub->retaindeadtuples && sub->retentionactive)
1289 4 : can_update_xmin = false;
1290 :
1291 : /*
1292 : * If the worker is eligible to start now, launch it. Otherwise,
1293 : * adjust wait_time so that we'll wake up as soon as it can be
1294 : * started.
1295 : *
1296 : * Each subscription's apply worker can only be restarted once per
1297 : * wal_retrieve_retry_interval, so that errors do not cause us to
1298 : * repeatedly restart the worker as fast as possible. In cases
1299 : * where a restart is expected (e.g., subscription parameter
1300 : * changes), another process should remove the last-start entry
1301 : * for the subscription so that the worker can be restarted
1302 : * without waiting for wal_retrieve_retry_interval to elapse.
1303 : */
1304 668 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1305 668 : now = GetCurrentTimestamp();
1306 668 : if (last_start == 0 ||
1307 404 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1308 : {
1309 430 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
1310 430 : if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
1311 430 : sub->dbid, sub->oid, sub->name,
1312 : sub->owner, InvalidOid,
1313 : DSM_HANDLE_INVALID,
1314 434 : sub->retaindeadtuples &&
1315 4 : sub->retentionactive))
1316 : {
1317 : /*
1318 : * We get here either if we failed to launch a worker
1319 : * (perhaps for resource-exhaustion reasons) or if we
1320 : * launched one but it immediately quit. Either way, it
1321 : * seems appropriate to try again after
1322 : * wal_retrieve_retry_interval.
1323 : */
1324 16 : wait_time = Min(wait_time,
1325 : wal_retrieve_retry_interval);
1326 : }
1327 : }
1328 : else
1329 : {
1330 238 : wait_time = Min(wait_time,
1331 : wal_retrieve_retry_interval - elapsed);
1332 : }
1333 : }
1334 :
1335 : /*
1336 : * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
1337 : * that requires us to retain dead tuples. Otherwise, if required,
1338 : * advance the slot's xmin to protect dead tuples required for the
1339 : * conflict detection.
1340 : *
1341 : * Additionally, if all apply workers for subscriptions with
1342 : * retain_dead_tuples enabled have requested to stop retention, the
1343 : * slot's xmin will be set to InvalidTransactionId allowing the
1344 : * removal of dead tuples.
1345 : */
1346 5696 : if (MyReplicationSlot)
1347 : {
1348 138 : if (!retain_dead_tuples)
1349 2 : ReplicationSlotDropAcquired();
1350 136 : else if (can_update_xmin)
1351 128 : update_conflict_slot_xmin(xmin);
1352 : }
1353 :
1354 : /* Switch back to original memory context. */
1355 5696 : MemoryContextSwitchTo(oldctx);
1356 : /* Clean the temporary memory. */
1357 5696 : MemoryContextDelete(subctx);
1358 :
1359 : /* Wait for more work. */
1360 5696 : rc = WaitLatch(MyLatch,
1361 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1362 : wait_time,
1363 : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1364 :
1365 5690 : if (rc & WL_LATCH_SET)
1366 : {
1367 5626 : ResetLatch(MyLatch);
1368 5626 : CHECK_FOR_INTERRUPTS();
1369 : }
1370 :
1371 4856 : if (ConfigReloadPending)
1372 : {
1373 72 : ConfigReloadPending = false;
1374 72 : ProcessConfigFile(PGC_SIGHUP);
1375 : }
1376 : }
1377 :
1378 : /* Not reachable */
1379 : }
1380 :
1381 : /*
1382 : * Determine the minimum non-removable transaction ID across all apply workers
1383 : * for subscriptions that have retain_dead_tuples enabled. Store the result
1384 : * in *xmin.
1385 : */
1386 : static void
1387 128 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
1388 : {
1389 : TransactionId nonremovable_xid;
1390 :
1391 : Assert(worker != NULL);
1392 :
1393 : /*
1394 : * The replication slot for conflict detection must be created before the
1395 : * worker starts.
1396 : */
1397 : Assert(MyReplicationSlot);
1398 :
1399 128 : SpinLockAcquire(&worker->relmutex);
1400 128 : nonremovable_xid = worker->oldest_nonremovable_xid;
1401 128 : SpinLockRelease(&worker->relmutex);
1402 :
1403 : /*
1404 : * Return if the apply worker has stopped retention concurrently.
1405 : *
1406 : * Although this function is invoked only when retentionactive is true,
1407 : * the apply worker might stop retention after the launcher fetches the
1408 : * retentionactive flag.
1409 : */
1410 128 : if (!TransactionIdIsValid(nonremovable_xid))
1411 0 : return;
1412 :
1413 128 : if (!TransactionIdIsValid(*xmin) ||
1414 0 : TransactionIdPrecedes(nonremovable_xid, *xmin))
1415 128 : *xmin = nonremovable_xid;
1416 : }
1417 :
1418 : /*
1419 : * Acquire the replication slot used to retain information for conflict
1420 : * detection, if it exists.
1421 : *
1422 : * Return true if successfully acquired, otherwise return false.
1423 : */
1424 : static bool
1425 844 : acquire_conflict_slot_if_exists(void)
1426 : {
1427 844 : if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
1428 842 : return false;
1429 :
1430 2 : ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
1431 2 : return true;
1432 : }
1433 :
1434 : /*
1435 : * Update the xmin the replication slot used to retain information required
1436 : * for conflict detection.
1437 : */
1438 : static void
1439 128 : update_conflict_slot_xmin(TransactionId new_xmin)
1440 : {
1441 : Assert(MyReplicationSlot);
1442 : Assert(!TransactionIdIsValid(new_xmin) ||
1443 : TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
1444 :
1445 : /* Return if the xmin value of the slot cannot be updated */
1446 128 : if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
1447 96 : return;
1448 :
1449 32 : SpinLockAcquire(&MyReplicationSlot->mutex);
1450 32 : MyReplicationSlot->effective_xmin = new_xmin;
1451 32 : MyReplicationSlot->data.xmin = new_xmin;
1452 32 : SpinLockRelease(&MyReplicationSlot->mutex);
1453 :
1454 32 : elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
1455 :
1456 32 : ReplicationSlotMarkDirty();
1457 32 : ReplicationSlotsComputeRequiredXmin(false);
1458 :
1459 : /*
1460 : * Like PhysicalConfirmReceivedLocation(), do not save slot information
1461 : * each time. This is acceptable because all concurrent transactions on
1462 : * the publisher that require the data preceding the slot's xmin should
1463 : * have already been applied and flushed on the subscriber before the xmin
1464 : * is advanced. So, even if the slot's xmin regresses after a restart, it
1465 : * will be advanced again in the next cycle. Therefore, no data required
1466 : * for conflict detection will be prematurely removed.
1467 : */
1468 32 : return;
1469 : }
1470 :
1471 : /*
1472 : * Initialize the xmin for the conflict detection slot.
1473 : */
1474 : static void
1475 8 : init_conflict_slot_xmin(void)
1476 : {
1477 : TransactionId xmin_horizon;
1478 :
1479 : /* Replication slot must exist but shouldn't be initialized. */
1480 : Assert(MyReplicationSlot &&
1481 : !TransactionIdIsValid(MyReplicationSlot->data.xmin));
1482 :
1483 8 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1484 :
1485 8 : xmin_horizon = GetOldestSafeDecodingTransactionId(false);
1486 :
1487 8 : SpinLockAcquire(&MyReplicationSlot->mutex);
1488 8 : MyReplicationSlot->effective_xmin = xmin_horizon;
1489 8 : MyReplicationSlot->data.xmin = xmin_horizon;
1490 8 : SpinLockRelease(&MyReplicationSlot->mutex);
1491 :
1492 8 : ReplicationSlotsComputeRequiredXmin(true);
1493 :
1494 8 : LWLockRelease(ProcArrayLock);
1495 :
1496 : /* Write this slot to disk */
1497 8 : ReplicationSlotMarkDirty();
1498 8 : ReplicationSlotSave();
1499 8 : }
1500 :
1501 : /*
1502 : * Create and acquire the replication slot used to retain information for
1503 : * conflict detection, if not yet.
1504 : */
1505 : void
1506 138 : CreateConflictDetectionSlot(void)
1507 : {
1508 : /* Exit early, if the replication slot is already created and acquired */
1509 138 : if (MyReplicationSlot)
1510 130 : return;
1511 :
1512 8 : ereport(LOG,
1513 : errmsg("creating replication conflict detection slot"));
1514 :
1515 8 : ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
1516 : false, false);
1517 :
1518 8 : init_conflict_slot_xmin();
1519 : }
1520 :
1521 : /*
1522 : * Is current process the logical replication launcher?
1523 : */
1524 : bool
1525 4906 : IsLogicalLauncher(void)
1526 : {
1527 4906 : return LogicalRepCtx->launcher_pid == MyProcPid;
1528 : }
1529 :
1530 : /*
1531 : * Return the pid of the leader apply worker if the given pid is the pid of a
1532 : * parallel apply worker, otherwise, return InvalidPid.
1533 : */
1534 : pid_t
1535 1950 : GetLeaderApplyWorkerPid(pid_t pid)
1536 : {
1537 1950 : int leader_pid = InvalidPid;
1538 : int i;
1539 :
1540 1950 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1541 :
1542 9750 : for (i = 0; i < max_logical_replication_workers; i++)
1543 : {
1544 7800 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1545 :
1546 7800 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1547 : {
1548 0 : leader_pid = w->leader_pid;
1549 0 : break;
1550 : }
1551 : }
1552 :
1553 1950 : LWLockRelease(LogicalRepWorkerLock);
1554 :
1555 1950 : return leader_pid;
1556 : }
1557 :
1558 : /*
1559 : * Returns state of the subscriptions.
1560 : */
1561 : Datum
1562 2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1563 : {
1564 : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1565 2 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1566 : int i;
1567 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1568 :
1569 2 : InitMaterializedSRF(fcinfo, 0);
1570 :
1571 : /* Make sure we get consistent view of the workers. */
1572 2 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1573 :
1574 10 : for (i = 0; i < max_logical_replication_workers; i++)
1575 : {
1576 : /* for each row */
1577 8 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1578 8 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1579 : int worker_pid;
1580 : LogicalRepWorker worker;
1581 :
1582 8 : memcpy(&worker, &LogicalRepCtx->workers[i],
1583 : sizeof(LogicalRepWorker));
1584 8 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1585 4 : continue;
1586 :
1587 4 : if (OidIsValid(subid) && worker.subid != subid)
1588 0 : continue;
1589 :
1590 4 : worker_pid = worker.proc->pid;
1591 :
1592 4 : values[0] = ObjectIdGetDatum(worker.subid);
1593 4 : if (isTablesyncWorker(&worker))
1594 0 : values[1] = ObjectIdGetDatum(worker.relid);
1595 : else
1596 4 : nulls[1] = true;
1597 4 : values[2] = Int32GetDatum(worker_pid);
1598 :
1599 4 : if (isParallelApplyWorker(&worker))
1600 0 : values[3] = Int32GetDatum(worker.leader_pid);
1601 : else
1602 4 : nulls[3] = true;
1603 :
1604 4 : if (XLogRecPtrIsInvalid(worker.last_lsn))
1605 0 : nulls[4] = true;
1606 : else
1607 4 : values[4] = LSNGetDatum(worker.last_lsn);
1608 4 : if (worker.last_send_time == 0)
1609 0 : nulls[5] = true;
1610 : else
1611 4 : values[5] = TimestampTzGetDatum(worker.last_send_time);
1612 4 : if (worker.last_recv_time == 0)
1613 0 : nulls[6] = true;
1614 : else
1615 4 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
1616 4 : if (XLogRecPtrIsInvalid(worker.reply_lsn))
1617 0 : nulls[7] = true;
1618 : else
1619 4 : values[7] = LSNGetDatum(worker.reply_lsn);
1620 4 : if (worker.reply_time == 0)
1621 0 : nulls[8] = true;
1622 : else
1623 4 : values[8] = TimestampTzGetDatum(worker.reply_time);
1624 :
1625 4 : switch (worker.type)
1626 : {
1627 4 : case WORKERTYPE_APPLY:
1628 4 : values[9] = CStringGetTextDatum("apply");
1629 4 : break;
1630 0 : case WORKERTYPE_PARALLEL_APPLY:
1631 0 : values[9] = CStringGetTextDatum("parallel apply");
1632 0 : break;
1633 0 : case WORKERTYPE_TABLESYNC:
1634 0 : values[9] = CStringGetTextDatum("table synchronization");
1635 0 : break;
1636 0 : case WORKERTYPE_UNKNOWN:
1637 : /* Should never happen. */
1638 0 : elog(ERROR, "unknown worker type");
1639 : }
1640 :
1641 4 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1642 : values, nulls);
1643 :
1644 : /*
1645 : * If only a single subscription was requested, and we found it,
1646 : * break.
1647 : */
1648 4 : if (OidIsValid(subid))
1649 0 : break;
1650 : }
1651 :
1652 2 : LWLockRelease(LogicalRepWorkerLock);
1653 :
1654 2 : return (Datum) 0;
1655 : }
|