Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * launcher.c
3 : * PostgreSQL logical replication worker launcher process
4 : *
5 : * Copyright (c) 2016-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/launcher.c
9 : *
10 : * NOTES
11 : * This module contains the logical replication worker launcher which
12 : * uses the background worker infrastructure to start the logical
13 : * replication workers for every enabled subscription.
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 :
18 : #include "postgres.h"
19 :
20 : #include "access/heapam.h"
21 : #include "access/htup.h"
22 : #include "access/htup_details.h"
23 : #include "access/tableam.h"
24 : #include "access/xact.h"
25 : #include "catalog/pg_subscription.h"
26 : #include "catalog/pg_subscription_rel.h"
27 : #include "funcapi.h"
28 : #include "lib/dshash.h"
29 : #include "miscadmin.h"
30 : #include "pgstat.h"
31 : #include "postmaster/bgworker.h"
32 : #include "postmaster/interrupt.h"
33 : #include "replication/logicallauncher.h"
34 : #include "replication/origin.h"
35 : #include "replication/slot.h"
36 : #include "replication/walreceiver.h"
37 : #include "replication/worker_internal.h"
38 : #include "storage/ipc.h"
39 : #include "storage/proc.h"
40 : #include "storage/procarray.h"
41 : #include "storage/subsystems.h"
42 : #include "tcop/tcopprot.h"
43 : #include "utils/builtins.h"
44 : #include "utils/memutils.h"
45 : #include "utils/pg_lsn.h"
46 : #include "utils/snapmgr.h"
47 : #include "utils/syscache.h"
48 : #include "utils/wait_event.h"
49 :
50 : /* max sleep time between cycles (3min) */
51 : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
52 :
53 : /* GUC variables */
54 : int max_logical_replication_workers = 4;
55 : int max_sync_workers_per_subscription = 2;
56 : int max_parallel_apply_workers_per_subscription = 2;
57 :
58 : LogicalRepWorker *MyLogicalRepWorker = NULL;
59 :
60 : typedef struct LogicalRepCtxStruct
61 : {
62 : /* Supervisor process. */
63 : pid_t launcher_pid;
64 :
65 : /* Hash table holding last start times of subscriptions' apply workers. */
66 : dsa_handle last_start_dsa;
67 : dshash_table_handle last_start_dsh;
68 :
69 : /* Background workers. */
70 : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
71 : } LogicalRepCtxStruct;
72 :
73 : static LogicalRepCtxStruct *LogicalRepCtx;
74 :
75 : static void ApplyLauncherShmemRequest(void *arg);
76 : static void ApplyLauncherShmemInit(void *arg);
77 :
78 : const ShmemCallbacks ApplyLauncherShmemCallbacks = {
79 : .request_fn = ApplyLauncherShmemRequest,
80 : .init_fn = ApplyLauncherShmemInit,
81 : };
82 :
83 : /* an entry in the last-start-times shared hash table */
84 : typedef struct LauncherLastStartTimesEntry
85 : {
86 : Oid subid; /* OID of logrep subscription (hash key) */
87 : TimestampTz last_start_time; /* last time its apply worker was started */
88 : } LauncherLastStartTimesEntry;
89 :
90 : /* parameters for the last-start-times shared hash table */
91 : static const dshash_parameters dsh_params = {
92 : sizeof(Oid),
93 : sizeof(LauncherLastStartTimesEntry),
94 : dshash_memcmp,
95 : dshash_memhash,
96 : dshash_memcpy,
97 : LWTRANCHE_LAUNCHER_HASH
98 : };
99 :
100 : static dsa_area *last_start_times_dsa = NULL;
101 : static dshash_table *last_start_times = NULL;
102 :
103 : static bool on_commit_launcher_wakeup = false;
104 :
105 :
106 : static void logicalrep_launcher_onexit(int code, Datum arg);
107 : static void logicalrep_worker_onexit(int code, Datum arg);
108 : static void logicalrep_worker_detach(void);
109 : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
110 : static int logicalrep_pa_worker_count(Oid subid);
111 : static void logicalrep_launcher_attach_dshmem(void);
112 : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
113 : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
114 : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
115 : static bool acquire_conflict_slot_if_exists(void);
116 : static void update_conflict_slot_xmin(TransactionId new_xmin);
117 : static void init_conflict_slot_xmin(void);
118 :
119 :
120 : /*
121 : * Load the list of subscriptions.
122 : *
123 : * Only the fields interesting for worker start/stop functions are filled for
124 : * each subscription.
125 : */
126 : static List *
127 3239 : get_subscription_list(void)
128 : {
129 3239 : List *res = NIL;
130 : Relation rel;
131 : TableScanDesc scan;
132 : HeapTuple tup;
133 : MemoryContext resultcxt;
134 :
135 : /* This is the context that we will allocate our output data in */
136 3239 : resultcxt = CurrentMemoryContext;
137 :
138 : /*
139 : * Start a transaction so we can access pg_subscription.
140 : */
141 3239 : StartTransactionCommand();
142 :
143 3239 : rel = table_open(SubscriptionRelationId, AccessShareLock);
144 3239 : scan = table_beginscan_catalog(rel, 0, NULL);
145 :
146 4235 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
147 : {
148 996 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
149 : Subscription *sub;
150 : MemoryContext oldcxt;
151 :
152 : /*
153 : * Allocate our results in the caller's context, not the
154 : * transaction's. We do this inside the loop, and restore the original
155 : * context at the end, so that leaky things like heap_getnext() are
156 : * not called in a potentially long-lived context.
157 : */
158 996 : oldcxt = MemoryContextSwitchTo(resultcxt);
159 :
160 996 : sub = palloc0_object(Subscription);
161 996 : sub->oid = subform->oid;
162 996 : sub->dbid = subform->subdbid;
163 996 : sub->owner = subform->subowner;
164 996 : sub->enabled = subform->subenabled;
165 996 : sub->name = pstrdup(NameStr(subform->subname));
166 996 : sub->retaindeadtuples = subform->subretaindeadtuples;
167 996 : sub->retentionactive = subform->subretentionactive;
168 : /* We don't fill fields we are not interested in. */
169 :
170 996 : res = lappend(res, sub);
171 996 : MemoryContextSwitchTo(oldcxt);
172 : }
173 :
174 3239 : table_endscan(scan);
175 3239 : table_close(rel, AccessShareLock);
176 :
177 3239 : CommitTransactionCommand();
178 :
179 3239 : return res;
180 : }
181 :
182 : /*
183 : * Wait for a background worker to start up and attach to the shmem context.
184 : *
185 : * This is only needed for cleaning up the shared memory in case the worker
186 : * fails to attach.
187 : *
188 : * Returns whether the attach was successful.
189 : */
190 : static bool
191 469 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
192 : uint16 generation,
193 : BackgroundWorkerHandle *handle)
194 : {
195 469 : bool result = false;
196 469 : bool dropped_latch = false;
197 :
198 : for (;;)
199 1114 : {
200 : BgwHandleStatus status;
201 : pid_t pid;
202 : int rc;
203 :
204 1583 : CHECK_FOR_INTERRUPTS();
205 :
206 1582 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
207 :
208 : /* Worker either died or has started. Return false if died. */
209 1582 : if (!worker->in_use || worker->proc)
210 : {
211 461 : result = worker->in_use;
212 461 : LWLockRelease(LogicalRepWorkerLock);
213 461 : break;
214 : }
215 :
216 1121 : LWLockRelease(LogicalRepWorkerLock);
217 :
218 : /* Check if worker has died before attaching, and clean up after it. */
219 1121 : status = GetBackgroundWorkerPid(handle, &pid);
220 :
221 1121 : if (status == BGWH_STOPPED)
222 : {
223 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
224 : /* Ensure that this was indeed the worker we waited for. */
225 0 : if (generation == worker->generation)
226 0 : logicalrep_worker_cleanup(worker);
227 0 : LWLockRelease(LogicalRepWorkerLock);
228 0 : break; /* result is already false */
229 : }
230 :
231 : /*
232 : * We need timeout because we generally don't get notified via latch
233 : * about the worker attach. But we don't expect to have to wait long.
234 : */
235 1121 : rc = WaitLatch(MyLatch,
236 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
237 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
238 :
239 1121 : if (rc & WL_LATCH_SET)
240 : {
241 537 : ResetLatch(MyLatch);
242 537 : CHECK_FOR_INTERRUPTS();
243 530 : dropped_latch = true;
244 : }
245 : }
246 :
247 : /*
248 : * If we had to clear a latch event in order to wait, be sure to restore
249 : * it before exiting. Otherwise caller may miss events.
250 : */
251 461 : if (dropped_latch)
252 457 : SetLatch(MyLatch);
253 :
254 461 : return result;
255 : }
256 :
257 : /*
258 : * Walks the workers array and searches for one that matches given worker type,
259 : * subscription id, and relation id.
260 : *
261 : * For both apply workers and sequencesync workers, the relid should be set to
262 : * InvalidOid, as these workers handle changes across all tables and sequences
263 : * respectively, rather than targeting a specific relation. For tablesync
264 : * workers, the relid should be set to the OID of the relation being
265 : * synchronized.
266 : */
267 : LogicalRepWorker *
268 3328 : logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
269 : bool only_running)
270 : {
271 : int i;
272 3328 : LogicalRepWorker *res = NULL;
273 :
274 : /* relid must be valid only for table sync workers */
275 : Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
276 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
277 :
278 : /* Search for an attached worker that matches the specified criteria. */
279 10038 : for (i = 0; i < max_logical_replication_workers; i++)
280 : {
281 8774 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
282 :
283 : /* Skip parallel apply workers. */
284 8774 : if (isParallelApplyWorker(w))
285 0 : continue;
286 :
287 8774 : if (w->in_use && w->subid == subid && w->relid == relid &&
288 2090 : w->type == wtype && (!only_running || w->proc))
289 : {
290 2064 : res = w;
291 2064 : break;
292 : }
293 : }
294 :
295 3328 : return res;
296 : }
297 :
298 : /*
299 : * Similar to logicalrep_worker_find(), but returns a list of all workers for
300 : * the subscription, instead of just one.
301 : */
302 : List *
303 802 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
304 : {
305 : int i;
306 802 : List *res = NIL;
307 :
308 802 : if (acquire_lock)
309 146 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
310 :
311 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
312 :
313 : /* Search for attached worker for a given subscription id. */
314 4152 : for (i = 0; i < max_logical_replication_workers; i++)
315 : {
316 3350 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
317 :
318 3350 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
319 543 : res = lappend(res, w);
320 : }
321 :
322 802 : if (acquire_lock)
323 146 : LWLockRelease(LogicalRepWorkerLock);
324 :
325 802 : return res;
326 : }
327 :
328 : /*
329 : * Start new logical replication background worker, if possible.
330 : *
331 : * Returns true on success, false on failure.
332 : */
333 : bool
334 473 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
335 : Oid dbid, Oid subid, const char *subname, Oid userid,
336 : Oid relid, dsm_handle subworker_dsm,
337 : bool retain_dead_tuples)
338 : {
339 : BackgroundWorker bgw;
340 : BackgroundWorkerHandle *bgw_handle;
341 : uint16 generation;
342 : int i;
343 473 : int slot = 0;
344 473 : LogicalRepWorker *worker = NULL;
345 : int nsyncworkers;
346 : int nparallelapplyworkers;
347 : TimestampTz now;
348 473 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
349 473 : bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
350 473 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
351 :
352 : /*----------
353 : * Sanity checks:
354 : * - must be valid worker type
355 : * - tablesync workers are only ones to have relid
356 : * - parallel apply worker is the only kind of subworker
357 : * - The replication slot used in conflict detection is created when
358 : * retain_dead_tuples is enabled
359 : */
360 : Assert(wtype != WORKERTYPE_UNKNOWN);
361 : Assert(is_tablesync_worker == OidIsValid(relid));
362 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
363 : Assert(!retain_dead_tuples || MyReplicationSlot);
364 :
365 473 : ereport(DEBUG1,
366 : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
367 : subname)));
368 :
369 : /* Report this after the initial starting message for consistency. */
370 473 : if (max_active_replication_origins == 0)
371 0 : ereport(ERROR,
372 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
373 : errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
374 :
375 : /*
376 : * We need to do the modification of the shared memory under lock so that
377 : * we have consistent view.
378 : */
379 473 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
380 :
381 473 : retry:
382 : /* Find unused worker slot. */
383 828 : for (i = 0; i < max_logical_replication_workers; i++)
384 : {
385 828 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
386 :
387 828 : if (!w->in_use)
388 : {
389 473 : worker = w;
390 473 : slot = i;
391 473 : break;
392 : }
393 : }
394 :
395 473 : nsyncworkers = logicalrep_sync_worker_count(subid);
396 :
397 473 : now = GetCurrentTimestamp();
398 :
399 : /*
400 : * If we didn't find a free slot, try to do garbage collection. The
401 : * reason we do this is because if some worker failed to start up and its
402 : * parent has crashed while waiting, the in_use state was never cleared.
403 : */
404 473 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
405 : {
406 0 : bool did_cleanup = false;
407 :
408 0 : for (i = 0; i < max_logical_replication_workers; i++)
409 : {
410 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
411 :
412 : /*
413 : * If the worker was marked in use but didn't manage to attach in
414 : * time, clean it up.
415 : */
416 0 : if (w->in_use && !w->proc &&
417 0 : TimestampDifferenceExceeds(w->launch_time, now,
418 : wal_receiver_timeout))
419 : {
420 0 : elog(WARNING,
421 : "logical replication worker for subscription %u took too long to start; canceled",
422 : w->subid);
423 :
424 0 : logicalrep_worker_cleanup(w);
425 0 : did_cleanup = true;
426 : }
427 : }
428 :
429 0 : if (did_cleanup)
430 0 : goto retry;
431 : }
432 :
433 : /*
434 : * We don't allow to invoke more sync workers once we have reached the
435 : * sync worker limit per subscription. So, just return silently as we
436 : * might get here because of an otherwise harmless race condition.
437 : */
438 473 : if ((is_tablesync_worker || is_sequencesync_worker) &&
439 226 : nsyncworkers >= max_sync_workers_per_subscription)
440 : {
441 0 : LWLockRelease(LogicalRepWorkerLock);
442 0 : return false;
443 : }
444 :
445 473 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
446 :
447 : /*
448 : * Return false if the number of parallel apply workers reached the limit
449 : * per subscription.
450 : */
451 473 : if (is_parallel_apply_worker &&
452 13 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
453 : {
454 0 : LWLockRelease(LogicalRepWorkerLock);
455 0 : return false;
456 : }
457 :
458 : /*
459 : * However if there are no more free worker slots, inform user about it
460 : * before exiting.
461 : */
462 473 : if (worker == NULL)
463 : {
464 0 : LWLockRelease(LogicalRepWorkerLock);
465 0 : ereport(WARNING,
466 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
467 : errmsg("out of logical replication worker slots"),
468 : errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
469 0 : return false;
470 : }
471 :
472 : /* Prepare the worker slot. */
473 473 : worker->type = wtype;
474 473 : worker->launch_time = now;
475 473 : worker->in_use = true;
476 473 : worker->generation++;
477 473 : worker->proc = NULL;
478 473 : worker->dbid = dbid;
479 473 : worker->userid = userid;
480 473 : worker->subid = subid;
481 473 : worker->relid = relid;
482 473 : worker->relstate = SUBREL_STATE_UNKNOWN;
483 473 : worker->relstate_lsn = InvalidXLogRecPtr;
484 473 : worker->stream_fileset = NULL;
485 473 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
486 473 : worker->parallel_apply = is_parallel_apply_worker;
487 473 : worker->oldest_nonremovable_xid = retain_dead_tuples
488 2 : ? MyReplicationSlot->data.xmin
489 473 : : InvalidTransactionId;
490 473 : worker->last_lsn = InvalidXLogRecPtr;
491 473 : TIMESTAMP_NOBEGIN(worker->last_send_time);
492 473 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
493 473 : worker->reply_lsn = InvalidXLogRecPtr;
494 473 : TIMESTAMP_NOBEGIN(worker->reply_time);
495 473 : worker->last_seqsync_start_time = 0;
496 :
497 : /* Before releasing lock, remember generation for future identification. */
498 473 : generation = worker->generation;
499 :
500 473 : LWLockRelease(LogicalRepWorkerLock);
501 :
502 : /* Register the new dynamic worker. */
503 473 : memset(&bgw, 0, sizeof(bgw));
504 473 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
505 : BGWORKER_BACKEND_DATABASE_CONNECTION;
506 473 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
507 473 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
508 :
509 473 : switch (worker->type)
510 : {
511 234 : case WORKERTYPE_APPLY:
512 234 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
513 234 : snprintf(bgw.bgw_name, BGW_MAXLEN,
514 : "logical replication apply worker for subscription %u",
515 : subid);
516 234 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
517 234 : break;
518 :
519 13 : case WORKERTYPE_PARALLEL_APPLY:
520 13 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
521 13 : snprintf(bgw.bgw_name, BGW_MAXLEN,
522 : "logical replication parallel apply worker for subscription %u",
523 : subid);
524 13 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
525 :
526 13 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
527 13 : break;
528 :
529 9 : case WORKERTYPE_SEQUENCESYNC:
530 9 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
531 9 : snprintf(bgw.bgw_name, BGW_MAXLEN,
532 : "logical replication sequencesync worker for subscription %u",
533 : subid);
534 9 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
535 9 : break;
536 :
537 217 : case WORKERTYPE_TABLESYNC:
538 217 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
539 217 : snprintf(bgw.bgw_name, BGW_MAXLEN,
540 : "logical replication tablesync worker for subscription %u sync %u",
541 : subid,
542 : relid);
543 217 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
544 217 : break;
545 :
546 0 : case WORKERTYPE_UNKNOWN:
547 : /* Should never happen. */
548 0 : elog(ERROR, "unknown worker type");
549 : }
550 :
551 473 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
552 473 : bgw.bgw_notify_pid = MyProcPid;
553 473 : bgw.bgw_main_arg = Int32GetDatum(slot);
554 :
555 473 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
556 : {
557 : /* Failed to start worker, so clean up the worker slot. */
558 4 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
559 : Assert(generation == worker->generation);
560 4 : logicalrep_worker_cleanup(worker);
561 4 : LWLockRelease(LogicalRepWorkerLock);
562 :
563 4 : ereport(WARNING,
564 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
565 : errmsg("out of background worker slots"),
566 : errhint("You might need to increase \"%s\".", "max_worker_processes")));
567 4 : return false;
568 : }
569 :
570 : /* Now wait until it attaches. */
571 469 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
572 : }
573 :
574 : /*
575 : * Internal function to stop the worker and wait until it detaches from the
576 : * slot.
577 : */
578 : static void
579 87 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
580 : {
581 : uint16 generation;
582 :
583 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
584 :
585 : /*
586 : * Remember which generation was our worker so we can check if what we see
587 : * is still the same one.
588 : */
589 87 : generation = worker->generation;
590 :
591 : /*
592 : * If we found a worker but it does not have proc set then it is still
593 : * starting up; wait for it to finish starting and then kill it.
594 : */
595 87 : while (worker->in_use && !worker->proc)
596 : {
597 : int rc;
598 :
599 2 : LWLockRelease(LogicalRepWorkerLock);
600 :
601 : /* Wait a bit --- we don't expect to have to wait long. */
602 2 : rc = WaitLatch(MyLatch,
603 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
604 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
605 :
606 2 : if (rc & WL_LATCH_SET)
607 : {
608 0 : ResetLatch(MyLatch);
609 0 : CHECK_FOR_INTERRUPTS();
610 : }
611 :
612 : /* Recheck worker status. */
613 2 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
614 :
615 : /*
616 : * Check whether the worker slot is no longer used, which would mean
617 : * that the worker has exited, or whether the worker generation is
618 : * different, meaning that a different worker has taken the slot.
619 : */
620 2 : if (!worker->in_use || worker->generation != generation)
621 0 : return;
622 :
623 : /* Worker has assigned proc, so it has started. */
624 2 : if (worker->proc)
625 2 : break;
626 : }
627 :
628 : /* Now terminate the worker ... */
629 87 : kill(worker->proc->pid, signo);
630 :
631 : /* ... and wait for it to die. */
632 : for (;;)
633 116 : {
634 : int rc;
635 :
636 : /* is it gone? */
637 203 : if (!worker->proc || worker->generation != generation)
638 : break;
639 :
640 116 : LWLockRelease(LogicalRepWorkerLock);
641 :
642 : /* Wait a bit --- we don't expect to have to wait long. */
643 116 : rc = WaitLatch(MyLatch,
644 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
645 : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
646 :
647 116 : if (rc & WL_LATCH_SET)
648 : {
649 33 : ResetLatch(MyLatch);
650 33 : CHECK_FOR_INTERRUPTS();
651 : }
652 :
653 116 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
654 : }
655 : }
656 :
657 : /*
658 : * Stop the logical replication worker that matches the specified worker type,
659 : * subscription id, and relation id.
660 : */
661 : void
662 100 : logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
663 : {
664 : LogicalRepWorker *worker;
665 :
666 : /* relid must be valid only for table sync workers */
667 : Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
668 :
669 100 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
670 :
671 100 : worker = logicalrep_worker_find(wtype, subid, relid, false);
672 :
673 100 : if (worker)
674 : {
675 : Assert(!isParallelApplyWorker(worker));
676 78 : logicalrep_worker_stop_internal(worker, SIGTERM);
677 : }
678 :
679 100 : LWLockRelease(LogicalRepWorkerLock);
680 100 : }
681 :
682 : /*
683 : * Stop the given logical replication parallel apply worker.
684 : *
685 : * Node that the function sends SIGUSR2 instead of SIGTERM to the parallel apply
686 : * worker so that the worker exits cleanly.
687 : */
688 : void
689 5 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
690 : {
691 : int slot_no;
692 : uint16 generation;
693 : LogicalRepWorker *worker;
694 :
695 5 : SpinLockAcquire(&winfo->shared->mutex);
696 5 : generation = winfo->shared->logicalrep_worker_generation;
697 5 : slot_no = winfo->shared->logicalrep_worker_slot_no;
698 5 : SpinLockRelease(&winfo->shared->mutex);
699 :
700 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
701 :
702 : /*
703 : * Detach from the error_mq_handle for the parallel apply worker before
704 : * stopping it. This prevents the leader apply worker from trying to
705 : * receive the message from the error queue that might already be detached
706 : * by the parallel apply worker.
707 : */
708 5 : if (winfo->error_mq_handle)
709 : {
710 5 : shm_mq_detach(winfo->error_mq_handle);
711 5 : winfo->error_mq_handle = NULL;
712 : }
713 :
714 5 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
715 :
716 5 : worker = &LogicalRepCtx->workers[slot_no];
717 : Assert(isParallelApplyWorker(worker));
718 :
719 : /*
720 : * Only stop the worker if the generation matches and the worker is alive.
721 : */
722 5 : if (worker->generation == generation && worker->proc)
723 5 : logicalrep_worker_stop_internal(worker, SIGUSR2);
724 :
725 5 : LWLockRelease(LogicalRepWorkerLock);
726 5 : }
727 :
728 : /*
729 : * Wake up (using latch) any logical replication worker that matches the
730 : * specified worker type, subscription id, and relation id.
731 : */
732 : void
733 230 : logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
734 : {
735 : LogicalRepWorker *worker;
736 :
737 : /* relid must be valid only for table sync workers */
738 : Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
739 :
740 230 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
741 :
742 230 : worker = logicalrep_worker_find(wtype, subid, relid, true);
743 :
744 230 : if (worker)
745 230 : logicalrep_worker_wakeup_ptr(worker);
746 :
747 230 : LWLockRelease(LogicalRepWorkerLock);
748 230 : }
749 :
750 : /*
751 : * Wake up (using latch) the specified logical replication worker.
752 : *
753 : * Caller must hold lock, else worker->proc could change under us.
754 : */
755 : void
756 704 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
757 : {
758 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
759 :
760 704 : SetLatch(&worker->proc->procLatch);
761 704 : }
762 :
763 : /*
764 : * Attach to a slot.
765 : */
766 : void
767 621 : logicalrep_worker_attach(int slot)
768 : {
769 : /* Block concurrent access. */
770 621 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
771 :
772 : Assert(slot >= 0 && slot < max_logical_replication_workers);
773 621 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
774 :
775 621 : if (!MyLogicalRepWorker->in_use)
776 : {
777 0 : LWLockRelease(LogicalRepWorkerLock);
778 0 : ereport(ERROR,
779 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
780 : errmsg("logical replication worker slot %d is empty, cannot attach",
781 : slot)));
782 : }
783 :
784 621 : if (MyLogicalRepWorker->proc)
785 : {
786 0 : LWLockRelease(LogicalRepWorkerLock);
787 0 : ereport(ERROR,
788 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
789 : errmsg("logical replication worker slot %d is already used by "
790 : "another worker, cannot attach", slot)));
791 : }
792 :
793 621 : MyLogicalRepWorker->proc = MyProc;
794 621 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
795 :
796 621 : LWLockRelease(LogicalRepWorkerLock);
797 621 : }
798 :
799 : /*
800 : * Stop the parallel apply workers if any, and detach the leader apply worker
801 : * (cleans up the worker info).
802 : */
803 : static void
804 621 : logicalrep_worker_detach(void)
805 : {
806 : /* Stop the parallel apply workers. */
807 621 : if (am_leader_apply_worker())
808 : {
809 : List *workers;
810 : ListCell *lc;
811 :
812 : /*
813 : * Detach from the error_mq_handle for all parallel apply workers
814 : * before terminating them. This prevents the leader apply worker from
815 : * receiving the worker termination message and sending it to logs
816 : * when the same is already done by the parallel worker.
817 : */
818 384 : pa_detach_all_error_mq();
819 :
820 384 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
821 :
822 384 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
823 772 : foreach(lc, workers)
824 : {
825 388 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
826 :
827 388 : if (isParallelApplyWorker(w))
828 4 : logicalrep_worker_stop_internal(w, SIGTERM);
829 : }
830 :
831 384 : LWLockRelease(LogicalRepWorkerLock);
832 :
833 384 : list_free(workers);
834 : }
835 :
836 : /* Block concurrent access. */
837 621 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
838 :
839 621 : logicalrep_worker_cleanup(MyLogicalRepWorker);
840 :
841 621 : LWLockRelease(LogicalRepWorkerLock);
842 621 : }
843 :
844 : /*
845 : * Clean up worker info.
846 : */
847 : static void
848 625 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
849 : {
850 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
851 :
852 625 : worker->type = WORKERTYPE_UNKNOWN;
853 625 : worker->in_use = false;
854 625 : worker->proc = NULL;
855 625 : worker->dbid = InvalidOid;
856 625 : worker->userid = InvalidOid;
857 625 : worker->subid = InvalidOid;
858 625 : worker->relid = InvalidOid;
859 625 : worker->leader_pid = InvalidPid;
860 625 : worker->parallel_apply = false;
861 625 : }
862 :
863 : /*
864 : * Cleanup function for logical replication launcher.
865 : *
866 : * Called on logical replication launcher exit.
867 : */
868 : static void
869 509 : logicalrep_launcher_onexit(int code, Datum arg)
870 : {
871 509 : LogicalRepCtx->launcher_pid = 0;
872 509 : }
873 :
874 : /*
875 : * Reset the last_seqsync_start_time of the sequencesync worker in the
876 : * subscription's apply worker.
877 : *
878 : * Note that this value is not stored in the sequencesync worker, because that
879 : * has finished already and is about to exit.
880 : */
881 : void
882 5 : logicalrep_reset_seqsync_start_time(void)
883 : {
884 : LogicalRepWorker *worker;
885 :
886 : /*
887 : * The apply worker can't access last_seqsync_start_time concurrently, so
888 : * it is okay to use SHARED lock here. See ProcessSequencesForSync().
889 : */
890 5 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
891 :
892 5 : worker = logicalrep_worker_find(WORKERTYPE_APPLY,
893 5 : MyLogicalRepWorker->subid, InvalidOid,
894 : true);
895 5 : if (worker)
896 5 : worker->last_seqsync_start_time = 0;
897 :
898 5 : LWLockRelease(LogicalRepWorkerLock);
899 5 : }
900 :
901 : /*
902 : * Cleanup function.
903 : *
904 : * Called on logical replication worker exit.
905 : */
906 : static void
907 621 : logicalrep_worker_onexit(int code, Datum arg)
908 : {
909 : /* Disconnect gracefully from the remote side. */
910 621 : if (LogRepWorkerWalRcvConn)
911 483 : walrcv_disconnect(LogRepWorkerWalRcvConn);
912 :
913 621 : logicalrep_worker_detach();
914 :
915 : /* Cleanup fileset used for streaming transactions. */
916 621 : if (MyLogicalRepWorker->stream_fileset != NULL)
917 14 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
918 :
919 : /*
920 : * Session level locks may be acquired outside of a transaction in
921 : * parallel apply mode and will not be released when the worker
922 : * terminates, so manually release all locks before the worker exits.
923 : *
924 : * The locks will be acquired once the worker is initialized.
925 : */
926 621 : if (!InitializingApplyWorker)
927 550 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
928 :
929 621 : ApplyLauncherWakeup();
930 621 : }
931 :
932 : /*
933 : * Count the number of registered (not necessarily running) sync workers
934 : * for a subscription.
935 : */
936 : int
937 1343 : logicalrep_sync_worker_count(Oid subid)
938 : {
939 : int i;
940 1343 : int res = 0;
941 :
942 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
943 :
944 : /* Search for attached worker for a given subscription id. */
945 6923 : for (i = 0; i < max_logical_replication_workers; i++)
946 : {
947 5580 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
948 :
949 5580 : if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
950 1350 : res++;
951 : }
952 :
953 1343 : return res;
954 : }
955 :
956 : /*
957 : * Count the number of registered (but not necessarily running) parallel apply
958 : * workers for a subscription.
959 : */
960 : static int
961 473 : logicalrep_pa_worker_count(Oid subid)
962 : {
963 : int i;
964 473 : int res = 0;
965 :
966 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
967 :
968 : /*
969 : * Scan all attached parallel apply workers, only counting those which
970 : * have the given subscription id.
971 : */
972 2491 : for (i = 0; i < max_logical_replication_workers; i++)
973 : {
974 2018 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
975 :
976 2018 : if (isParallelApplyWorker(w) && w->subid == subid)
977 2 : res++;
978 : }
979 :
980 473 : return res;
981 : }
982 :
983 : /*
984 : * ApplyLauncherShmemRequest
985 : * Register shared memory space needed for replication launcher
986 : */
987 : static void
988 1233 : ApplyLauncherShmemRequest(void *arg)
989 : {
990 : Size size;
991 :
992 : /*
993 : * Need the fixed struct and the array of LogicalRepWorker.
994 : */
995 1233 : size = sizeof(LogicalRepCtxStruct);
996 1233 : size = MAXALIGN(size);
997 1233 : size = add_size(size, mul_size(max_logical_replication_workers,
998 : sizeof(LogicalRepWorker)));
999 1233 : ShmemRequestStruct(.name = "Logical Replication Launcher Data",
1000 : .size = size,
1001 : .ptr = (void **) &LogicalRepCtx,
1002 : );
1003 1233 : }
1004 :
1005 : /*
1006 : * ApplyLauncherRegister
1007 : * Register a background worker running the logical replication launcher.
1008 : */
1009 : void
1010 993 : ApplyLauncherRegister(void)
1011 : {
1012 : BackgroundWorker bgw;
1013 :
1014 : /*
1015 : * The logical replication launcher is disabled during binary upgrades, to
1016 : * prevent logical replication workers from running on the source cluster.
1017 : * That could cause replication origins to move forward after having been
1018 : * copied to the target cluster, potentially creating conflicts with the
1019 : * copied data files.
1020 : */
1021 993 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
1022 61 : return;
1023 :
1024 932 : memset(&bgw, 0, sizeof(bgw));
1025 932 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
1026 : BGWORKER_BACKEND_DATABASE_CONNECTION;
1027 932 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
1028 932 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
1029 932 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
1030 932 : snprintf(bgw.bgw_name, BGW_MAXLEN,
1031 : "logical replication launcher");
1032 932 : snprintf(bgw.bgw_type, BGW_MAXLEN,
1033 : "logical replication launcher");
1034 932 : bgw.bgw_restart_time = 5;
1035 932 : bgw.bgw_notify_pid = 0;
1036 932 : bgw.bgw_main_arg = (Datum) 0;
1037 :
1038 932 : RegisterBackgroundWorker(&bgw);
1039 : }
1040 :
1041 : /*
1042 : * ApplyLauncherShmemInit
1043 : * Initialize replication launcher shared memory
1044 : */
1045 : static void
1046 1230 : ApplyLauncherShmemInit(void *arg)
1047 : {
1048 : int slot;
1049 :
1050 1230 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
1051 1230 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
1052 :
1053 : /* Initialize memory and spin locks for each worker slot. */
1054 6113 : for (slot = 0; slot < max_logical_replication_workers; slot++)
1055 : {
1056 4883 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
1057 :
1058 4883 : memset(worker, 0, sizeof(LogicalRepWorker));
1059 4883 : SpinLockInit(&worker->relmutex);
1060 : }
1061 1230 : }
1062 :
1063 : /*
1064 : * Initialize or attach to the dynamic shared hash table that stores the
1065 : * last-start times, if not already done.
1066 : * This must be called before accessing the table.
1067 : */
1068 : static void
1069 870 : logicalrep_launcher_attach_dshmem(void)
1070 : {
1071 : MemoryContext oldcontext;
1072 :
1073 : /* Quick exit if we already did this. */
1074 870 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
1075 810 : last_start_times != NULL)
1076 606 : return;
1077 :
1078 : /* Otherwise, use a lock to ensure only one process creates the table. */
1079 264 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1080 :
1081 : /* Be sure any local memory allocated by DSA routines is persistent. */
1082 264 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1083 :
1084 264 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1085 : {
1086 : /* Initialize dynamic shared hash table for last-start times. */
1087 60 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1088 60 : dsa_pin(last_start_times_dsa);
1089 60 : dsa_pin_mapping(last_start_times_dsa);
1090 60 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1091 :
1092 : /* Store handles in shared memory for other backends to use. */
1093 60 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1094 60 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1095 : }
1096 204 : else if (!last_start_times)
1097 : {
1098 : /* Attach to existing dynamic shared hash table. */
1099 204 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1100 204 : dsa_pin_mapping(last_start_times_dsa);
1101 204 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
1102 204 : LogicalRepCtx->last_start_dsh, NULL);
1103 : }
1104 :
1105 264 : MemoryContextSwitchTo(oldcontext);
1106 264 : LWLockRelease(LogicalRepWorkerLock);
1107 : }
1108 :
1109 : /*
1110 : * Set the last-start time for the subscription.
1111 : */
1112 : static void
1113 234 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1114 : {
1115 : LauncherLastStartTimesEntry *entry;
1116 : bool found;
1117 :
1118 234 : logicalrep_launcher_attach_dshmem();
1119 :
1120 234 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1121 234 : entry->last_start_time = start_time;
1122 234 : dshash_release_lock(last_start_times, entry);
1123 234 : }
1124 :
1125 : /*
1126 : * Return the last-start time for the subscription, or 0 if there isn't one.
1127 : */
1128 : static TimestampTz
1129 372 : ApplyLauncherGetWorkerStartTime(Oid subid)
1130 : {
1131 : LauncherLastStartTimesEntry *entry;
1132 : TimestampTz ret;
1133 :
1134 372 : logicalrep_launcher_attach_dshmem();
1135 :
1136 372 : entry = dshash_find(last_start_times, &subid, false);
1137 372 : if (entry == NULL)
1138 131 : return 0;
1139 :
1140 241 : ret = entry->last_start_time;
1141 241 : dshash_release_lock(last_start_times, entry);
1142 :
1143 241 : return ret;
1144 : }
1145 :
1146 : /*
1147 : * Remove the last-start-time entry for the subscription, if one exists.
1148 : *
1149 : * This has two use-cases: to remove the entry related to a subscription
1150 : * that's been deleted or disabled (just to avoid leaking shared memory),
1151 : * and to allow immediate restart of an apply worker that has exited
1152 : * due to subscription parameter changes.
1153 : */
1154 : void
1155 264 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1156 : {
1157 264 : logicalrep_launcher_attach_dshmem();
1158 :
1159 264 : (void) dshash_delete_key(last_start_times, &subid);
1160 264 : }
1161 :
1162 : /*
1163 : * Wakeup the launcher on commit if requested.
1164 : */
1165 : void
1166 626643 : AtEOXact_ApplyLauncher(bool isCommit)
1167 : {
1168 626643 : if (isCommit)
1169 : {
1170 591233 : if (on_commit_launcher_wakeup)
1171 154 : ApplyLauncherWakeup();
1172 : }
1173 :
1174 626643 : on_commit_launcher_wakeup = false;
1175 626643 : }
1176 :
1177 : /*
1178 : * Request wakeup of the launcher on commit of the transaction.
1179 : *
1180 : * This is used to send launcher signal to stop sleeping and process the
1181 : * subscriptions when current transaction commits. Should be used when new
1182 : * tuple was added to the pg_subscription catalog.
1183 : */
1184 : void
1185 155 : ApplyLauncherWakeupAtCommit(void)
1186 : {
1187 155 : if (!on_commit_launcher_wakeup)
1188 154 : on_commit_launcher_wakeup = true;
1189 155 : }
1190 :
1191 : /*
1192 : * Wakeup the launcher immediately.
1193 : */
1194 : void
1195 821 : ApplyLauncherWakeup(void)
1196 : {
1197 821 : if (LogicalRepCtx->launcher_pid != 0)
1198 806 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1199 821 : }
1200 :
1201 : /*
1202 : * Main loop for the apply launcher process.
1203 : */
1204 : void
1205 509 : ApplyLauncherMain(Datum main_arg)
1206 : {
1207 509 : ereport(DEBUG1,
1208 : (errmsg_internal("logical replication launcher started")));
1209 :
1210 509 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1211 :
1212 : Assert(LogicalRepCtx->launcher_pid == 0);
1213 509 : LogicalRepCtx->launcher_pid = MyProcPid;
1214 :
1215 : /* Establish signal handlers. */
1216 509 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
1217 509 : BackgroundWorkerUnblockSignals();
1218 :
1219 : /*
1220 : * Establish connection to nailed catalogs (we only ever access
1221 : * pg_subscription).
1222 : */
1223 509 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1224 :
1225 : /*
1226 : * Acquire the conflict detection slot at startup to ensure it can be
1227 : * dropped if no longer needed after a restart.
1228 : */
1229 509 : acquire_conflict_slot_if_exists();
1230 :
1231 : /* Enter main loop */
1232 : for (;;)
1233 2730 : {
1234 : int rc;
1235 : List *sublist;
1236 : ListCell *lc;
1237 : MemoryContext subctx;
1238 : MemoryContext oldctx;
1239 3239 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1240 3239 : bool can_update_xmin = true;
1241 3239 : bool retain_dead_tuples = false;
1242 3239 : TransactionId xmin = InvalidTransactionId;
1243 :
1244 3239 : CHECK_FOR_INTERRUPTS();
1245 :
1246 : /* Use temporary context to avoid leaking memory across cycles. */
1247 3239 : subctx = AllocSetContextCreate(TopMemoryContext,
1248 : "Logical Replication Launcher sublist",
1249 : ALLOCSET_DEFAULT_SIZES);
1250 3239 : oldctx = MemoryContextSwitchTo(subctx);
1251 :
1252 : /*
1253 : * Start any missing workers for enabled subscriptions.
1254 : *
1255 : * Also, during the iteration through all subscriptions, we compute
1256 : * the minimum XID required to protect deleted tuples for conflict
1257 : * detection if one of the subscription enables retain_dead_tuples
1258 : * option.
1259 : */
1260 3239 : sublist = get_subscription_list();
1261 4226 : foreach(lc, sublist)
1262 : {
1263 995 : Subscription *sub = (Subscription *) lfirst(lc);
1264 : LogicalRepWorker *w;
1265 : TimestampTz last_start;
1266 : TimestampTz now;
1267 : long elapsed;
1268 :
1269 995 : if (sub->retaindeadtuples)
1270 : {
1271 70 : retain_dead_tuples = true;
1272 :
1273 : /*
1274 : * Create a replication slot to retain information necessary
1275 : * for conflict detection such as dead tuples, commit
1276 : * timestamps, and origins.
1277 : *
1278 : * The slot is created before starting the apply worker to
1279 : * prevent it from unnecessarily maintaining its
1280 : * oldest_nonremovable_xid.
1281 : *
1282 : * The slot is created even for a disabled subscription to
1283 : * ensure that conflict-related information is available when
1284 : * applying remote changes that occurred before the
1285 : * subscription was enabled.
1286 : */
1287 70 : CreateConflictDetectionSlot();
1288 :
1289 70 : if (sub->retentionactive)
1290 : {
1291 : /*
1292 : * Can't advance xmin of the slot unless all the
1293 : * subscriptions actively retaining dead tuples are
1294 : * enabled. This is required to ensure that we don't
1295 : * advance the xmin of CONFLICT_DETECTION_SLOT if one of
1296 : * the subscriptions is not enabled. Otherwise, we won't
1297 : * be able to detect conflicts reliably for such a
1298 : * subscription even though it has set the
1299 : * retain_dead_tuples option.
1300 : */
1301 70 : can_update_xmin &= sub->enabled;
1302 :
1303 : /*
1304 : * Initialize the slot once the subscription activates
1305 : * retention.
1306 : */
1307 70 : if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
1308 0 : init_conflict_slot_xmin();
1309 : }
1310 : }
1311 :
1312 995 : if (!sub->enabled)
1313 44 : continue;
1314 :
1315 951 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1316 951 : w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid,
1317 : false);
1318 :
1319 951 : if (w != NULL)
1320 : {
1321 : /*
1322 : * Compute the minimum xmin required to protect dead tuples
1323 : * required for conflict detection among all running apply
1324 : * workers. This computation is performed while holding
1325 : * LogicalRepWorkerLock to prevent accessing invalid worker
1326 : * data, in scenarios where a worker might exit and reset its
1327 : * state concurrently.
1328 : */
1329 579 : if (sub->retaindeadtuples &&
1330 66 : sub->retentionactive &&
1331 : can_update_xmin)
1332 66 : compute_min_nonremovable_xid(w, &xmin);
1333 :
1334 579 : LWLockRelease(LogicalRepWorkerLock);
1335 :
1336 : /* worker is running already */
1337 579 : continue;
1338 : }
1339 :
1340 372 : LWLockRelease(LogicalRepWorkerLock);
1341 :
1342 : /*
1343 : * Can't advance xmin of the slot unless all the workers
1344 : * corresponding to subscriptions actively retaining dead tuples
1345 : * are running, disabling the further computation of the minimum
1346 : * nonremovable xid.
1347 : */
1348 372 : if (sub->retaindeadtuples && sub->retentionactive)
1349 2 : can_update_xmin = false;
1350 :
1351 : /*
1352 : * If the worker is eligible to start now, launch it. Otherwise,
1353 : * adjust wait_time so that we'll wake up as soon as it can be
1354 : * started.
1355 : *
1356 : * Each subscription's apply worker can only be restarted once per
1357 : * wal_retrieve_retry_interval, so that errors do not cause us to
1358 : * repeatedly restart the worker as fast as possible. In cases
1359 : * where a restart is expected (e.g., subscription parameter
1360 : * changes), another process should remove the last-start entry
1361 : * for the subscription so that the worker can be restarted
1362 : * without waiting for wal_retrieve_retry_interval to elapse.
1363 : */
1364 372 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1365 372 : now = GetCurrentTimestamp();
1366 372 : if (last_start == 0 ||
1367 241 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1368 : {
1369 234 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
1370 234 : if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
1371 234 : sub->dbid, sub->oid, sub->name,
1372 : sub->owner, InvalidOid,
1373 : DSM_HANDLE_INVALID,
1374 236 : sub->retaindeadtuples &&
1375 236 : sub->retentionactive))
1376 : {
1377 : /*
1378 : * We get here either if we failed to launch a worker
1379 : * (perhaps for resource-exhaustion reasons) or if we
1380 : * launched one but it immediately quit. Either way, it
1381 : * seems appropriate to try again after
1382 : * wal_retrieve_retry_interval.
1383 : */
1384 8 : wait_time = Min(wait_time,
1385 : wal_retrieve_retry_interval);
1386 : }
1387 : }
1388 : else
1389 : {
1390 138 : wait_time = Min(wait_time,
1391 : wal_retrieve_retry_interval - elapsed);
1392 : }
1393 : }
1394 :
1395 : /*
1396 : * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
1397 : * that requires us to retain dead tuples. Otherwise, if required,
1398 : * advance the slot's xmin to protect dead tuples required for the
1399 : * conflict detection.
1400 : *
1401 : * Additionally, if all apply workers for subscriptions with
1402 : * retain_dead_tuples enabled have requested to stop retention, the
1403 : * slot's xmin will be set to InvalidTransactionId allowing the
1404 : * removal of dead tuples.
1405 : */
1406 3231 : if (MyReplicationSlot)
1407 : {
1408 71 : if (!retain_dead_tuples)
1409 1 : ReplicationSlotDropAcquired();
1410 70 : else if (can_update_xmin)
1411 66 : update_conflict_slot_xmin(xmin);
1412 : }
1413 :
1414 : /* Switch back to original memory context. */
1415 3231 : MemoryContextSwitchTo(oldctx);
1416 : /* Clean the temporary memory. */
1417 3231 : MemoryContextDelete(subctx);
1418 :
1419 : /* Wait for more work. */
1420 3231 : rc = WaitLatch(MyLatch,
1421 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1422 : wait_time,
1423 : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1424 :
1425 3228 : if (rc & WL_LATCH_SET)
1426 : {
1427 3192 : ResetLatch(MyLatch);
1428 3192 : CHECK_FOR_INTERRUPTS();
1429 : }
1430 :
1431 2730 : if (ConfigReloadPending)
1432 : {
1433 50 : ConfigReloadPending = false;
1434 50 : ProcessConfigFile(PGC_SIGHUP);
1435 : }
1436 : }
1437 :
1438 : /* Not reachable */
1439 : }
1440 :
1441 : /*
1442 : * Determine the minimum non-removable transaction ID across all apply workers
1443 : * for subscriptions that have retain_dead_tuples enabled. Store the result
1444 : * in *xmin.
1445 : */
1446 : static void
1447 66 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
1448 : {
1449 : TransactionId nonremovable_xid;
1450 :
1451 : Assert(worker != NULL);
1452 :
1453 : /*
1454 : * The replication slot for conflict detection must be created before the
1455 : * worker starts.
1456 : */
1457 : Assert(MyReplicationSlot);
1458 :
1459 66 : SpinLockAcquire(&worker->relmutex);
1460 66 : nonremovable_xid = worker->oldest_nonremovable_xid;
1461 66 : SpinLockRelease(&worker->relmutex);
1462 :
1463 : /*
1464 : * Return if the apply worker has stopped retention concurrently.
1465 : *
1466 : * Although this function is invoked only when retentionactive is true,
1467 : * the apply worker might stop retention after the launcher fetches the
1468 : * retentionactive flag.
1469 : */
1470 66 : if (!TransactionIdIsValid(nonremovable_xid))
1471 0 : return;
1472 :
1473 66 : if (!TransactionIdIsValid(*xmin) ||
1474 0 : TransactionIdPrecedes(nonremovable_xid, *xmin))
1475 66 : *xmin = nonremovable_xid;
1476 : }
1477 :
1478 : /*
1479 : * Acquire the replication slot used to retain information for conflict
1480 : * detection, if it exists.
1481 : *
1482 : * Return true if successfully acquired, otherwise return false.
1483 : */
1484 : static bool
1485 509 : acquire_conflict_slot_if_exists(void)
1486 : {
1487 509 : if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
1488 508 : return false;
1489 :
1490 1 : ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
1491 1 : return true;
1492 : }
1493 :
1494 : /*
1495 : * Update the xmin the replication slot used to retain information required
1496 : * for conflict detection.
1497 : */
1498 : static void
1499 66 : update_conflict_slot_xmin(TransactionId new_xmin)
1500 : {
1501 : Assert(MyReplicationSlot);
1502 : Assert(!TransactionIdIsValid(new_xmin) ||
1503 : TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
1504 :
1505 : /* Return if the xmin value of the slot cannot be updated */
1506 66 : if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
1507 50 : return;
1508 :
1509 16 : SpinLockAcquire(&MyReplicationSlot->mutex);
1510 16 : MyReplicationSlot->effective_xmin = new_xmin;
1511 16 : MyReplicationSlot->data.xmin = new_xmin;
1512 16 : SpinLockRelease(&MyReplicationSlot->mutex);
1513 :
1514 16 : elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
1515 :
1516 16 : ReplicationSlotMarkDirty();
1517 16 : ReplicationSlotsComputeRequiredXmin(false);
1518 :
1519 : /*
1520 : * Like PhysicalConfirmReceivedLocation(), do not save slot information
1521 : * each time. This is acceptable because all concurrent transactions on
1522 : * the publisher that require the data preceding the slot's xmin should
1523 : * have already been applied and flushed on the subscriber before the xmin
1524 : * is advanced. So, even if the slot's xmin regresses after a restart, it
1525 : * will be advanced again in the next cycle. Therefore, no data required
1526 : * for conflict detection will be prematurely removed.
1527 : */
1528 16 : return;
1529 : }
1530 :
1531 : /*
1532 : * Initialize the xmin for the conflict detection slot.
1533 : */
1534 : static void
1535 4 : init_conflict_slot_xmin(void)
1536 : {
1537 : TransactionId xmin_horizon;
1538 :
1539 : /* Replication slot must exist but shouldn't be initialized. */
1540 : Assert(MyReplicationSlot &&
1541 : !TransactionIdIsValid(MyReplicationSlot->data.xmin));
1542 :
1543 4 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1544 4 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1545 :
1546 4 : xmin_horizon = GetOldestSafeDecodingTransactionId(false);
1547 :
1548 4 : SpinLockAcquire(&MyReplicationSlot->mutex);
1549 4 : MyReplicationSlot->effective_xmin = xmin_horizon;
1550 4 : MyReplicationSlot->data.xmin = xmin_horizon;
1551 4 : SpinLockRelease(&MyReplicationSlot->mutex);
1552 :
1553 4 : ReplicationSlotsComputeRequiredXmin(true);
1554 :
1555 4 : LWLockRelease(ProcArrayLock);
1556 4 : LWLockRelease(ReplicationSlotControlLock);
1557 :
1558 : /* Write this slot to disk */
1559 4 : ReplicationSlotMarkDirty();
1560 4 : ReplicationSlotSave();
1561 4 : }
1562 :
1563 : /*
1564 : * Create and acquire the replication slot used to retain information for
1565 : * conflict detection, if not yet.
1566 : */
1567 : void
1568 71 : CreateConflictDetectionSlot(void)
1569 : {
1570 : /* Exit early, if the replication slot is already created and acquired */
1571 71 : if (MyReplicationSlot)
1572 67 : return;
1573 :
1574 4 : ereport(LOG,
1575 : errmsg("creating replication conflict detection slot"));
1576 :
1577 4 : ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
1578 : false, false);
1579 :
1580 4 : init_conflict_slot_xmin();
1581 : }
1582 :
1583 : /*
1584 : * Is current process the logical replication launcher?
1585 : */
1586 : bool
1587 2742 : IsLogicalLauncher(void)
1588 : {
1589 2742 : return LogicalRepCtx->launcher_pid == MyProcPid;
1590 : }
1591 :
1592 : /*
1593 : * Return the pid of the leader apply worker if the given pid is the pid of a
1594 : * parallel apply worker, otherwise, return InvalidPid.
1595 : */
1596 : pid_t
1597 954 : GetLeaderApplyWorkerPid(pid_t pid)
1598 : {
1599 954 : int leader_pid = InvalidPid;
1600 : int i;
1601 :
1602 954 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1603 :
1604 4770 : for (i = 0; i < max_logical_replication_workers; i++)
1605 : {
1606 3816 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1607 :
1608 3816 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1609 : {
1610 0 : leader_pid = w->leader_pid;
1611 0 : break;
1612 : }
1613 : }
1614 :
1615 954 : LWLockRelease(LogicalRepWorkerLock);
1616 :
1617 954 : return leader_pid;
1618 : }
1619 :
1620 : /*
1621 : * Returns state of the subscriptions.
1622 : */
1623 : Datum
1624 1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1625 : {
1626 : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1627 1 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1628 : int i;
1629 1 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1630 :
1631 1 : InitMaterializedSRF(fcinfo, 0);
1632 :
1633 : /* Make sure we get consistent view of the workers. */
1634 1 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1635 :
1636 5 : for (i = 0; i < max_logical_replication_workers; i++)
1637 : {
1638 : /* for each row */
1639 4 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1640 4 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1641 : int worker_pid;
1642 : LogicalRepWorker worker;
1643 :
1644 4 : memcpy(&worker, &LogicalRepCtx->workers[i],
1645 : sizeof(LogicalRepWorker));
1646 4 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1647 2 : continue;
1648 :
1649 2 : if (OidIsValid(subid) && worker.subid != subid)
1650 0 : continue;
1651 :
1652 2 : worker_pid = worker.proc->pid;
1653 :
1654 2 : values[0] = ObjectIdGetDatum(worker.subid);
1655 2 : if (isTableSyncWorker(&worker))
1656 0 : values[1] = ObjectIdGetDatum(worker.relid);
1657 : else
1658 2 : nulls[1] = true;
1659 2 : values[2] = Int32GetDatum(worker_pid);
1660 :
1661 2 : if (isParallelApplyWorker(&worker))
1662 0 : values[3] = Int32GetDatum(worker.leader_pid);
1663 : else
1664 2 : nulls[3] = true;
1665 :
1666 2 : if (!XLogRecPtrIsValid(worker.last_lsn))
1667 0 : nulls[4] = true;
1668 : else
1669 2 : values[4] = LSNGetDatum(worker.last_lsn);
1670 2 : if (worker.last_send_time == 0)
1671 0 : nulls[5] = true;
1672 : else
1673 2 : values[5] = TimestampTzGetDatum(worker.last_send_time);
1674 2 : if (worker.last_recv_time == 0)
1675 0 : nulls[6] = true;
1676 : else
1677 2 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
1678 2 : if (!XLogRecPtrIsValid(worker.reply_lsn))
1679 0 : nulls[7] = true;
1680 : else
1681 2 : values[7] = LSNGetDatum(worker.reply_lsn);
1682 2 : if (worker.reply_time == 0)
1683 0 : nulls[8] = true;
1684 : else
1685 2 : values[8] = TimestampTzGetDatum(worker.reply_time);
1686 :
1687 2 : switch (worker.type)
1688 : {
1689 2 : case WORKERTYPE_APPLY:
1690 2 : values[9] = CStringGetTextDatum("apply");
1691 2 : break;
1692 0 : case WORKERTYPE_PARALLEL_APPLY:
1693 0 : values[9] = CStringGetTextDatum("parallel apply");
1694 0 : break;
1695 0 : case WORKERTYPE_SEQUENCESYNC:
1696 0 : values[9] = CStringGetTextDatum("sequence synchronization");
1697 0 : break;
1698 0 : case WORKERTYPE_TABLESYNC:
1699 0 : values[9] = CStringGetTextDatum("table synchronization");
1700 0 : break;
1701 0 : case WORKERTYPE_UNKNOWN:
1702 : /* Should never happen. */
1703 0 : elog(ERROR, "unknown worker type");
1704 : }
1705 :
1706 2 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1707 : values, nulls);
1708 :
1709 : /*
1710 : * If only a single subscription was requested, and we found it,
1711 : * break.
1712 : */
1713 2 : if (OidIsValid(subid))
1714 0 : break;
1715 : }
1716 :
1717 1 : LWLockRelease(LogicalRepWorkerLock);
1718 :
1719 1 : return (Datum) 0;
1720 : }
|