Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * launcher.c
3 : * PostgreSQL logical replication worker launcher process
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/launcher.c
9 : *
10 : * NOTES
11 : * This module contains the logical replication worker launcher which
12 : * uses the background worker infrastructure to start the logical
13 : * replication workers for every enabled subscription.
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 :
18 : #include "postgres.h"
19 :
20 : #include "access/heapam.h"
21 : #include "access/htup.h"
22 : #include "access/htup_details.h"
23 : #include "access/tableam.h"
24 : #include "access/xact.h"
25 : #include "catalog/pg_subscription.h"
26 : #include "catalog/pg_subscription_rel.h"
27 : #include "funcapi.h"
28 : #include "lib/dshash.h"
29 : #include "miscadmin.h"
30 : #include "pgstat.h"
31 : #include "postmaster/bgworker.h"
32 : #include "postmaster/interrupt.h"
33 : #include "replication/logicallauncher.h"
34 : #include "replication/origin.h"
35 : #include "replication/walreceiver.h"
36 : #include "replication/worker_internal.h"
37 : #include "storage/ipc.h"
38 : #include "storage/proc.h"
39 : #include "storage/procarray.h"
40 : #include "tcop/tcopprot.h"
41 : #include "utils/builtins.h"
42 : #include "utils/memutils.h"
43 : #include "utils/pg_lsn.h"
44 : #include "utils/snapmgr.h"
45 :
46 : /* max sleep time between cycles (3min) */
47 : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
48 :
49 : /* GUC variables */
50 : int max_logical_replication_workers = 4;
51 : int max_sync_workers_per_subscription = 2;
52 : int max_parallel_apply_workers_per_subscription = 2;
53 :
54 : LogicalRepWorker *MyLogicalRepWorker = NULL;
55 :
56 : typedef struct LogicalRepCtxStruct
57 : {
58 : /* Supervisor process. */
59 : pid_t launcher_pid;
60 :
61 : /* Hash table holding last start times of subscriptions' apply workers. */
62 : dsa_handle last_start_dsa;
63 : dshash_table_handle last_start_dsh;
64 :
65 : /* Background workers. */
66 : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
67 : } LogicalRepCtxStruct;
68 :
69 : static LogicalRepCtxStruct *LogicalRepCtx;
70 :
71 : /* an entry in the last-start-times shared hash table */
72 : typedef struct LauncherLastStartTimesEntry
73 : {
74 : Oid subid; /* OID of logrep subscription (hash key) */
75 : TimestampTz last_start_time; /* last time its apply worker was started */
76 : } LauncherLastStartTimesEntry;
77 :
78 : /* parameters for the last-start-times shared hash table */
79 : static const dshash_parameters dsh_params = {
80 : sizeof(Oid),
81 : sizeof(LauncherLastStartTimesEntry),
82 : dshash_memcmp,
83 : dshash_memhash,
84 : dshash_memcpy,
85 : LWTRANCHE_LAUNCHER_HASH
86 : };
87 :
88 : static dsa_area *last_start_times_dsa = NULL;
89 : static dshash_table *last_start_times = NULL;
90 :
91 : static bool on_commit_launcher_wakeup = false;
92 :
93 :
94 : static void ApplyLauncherWakeup(void);
95 : static void logicalrep_launcher_onexit(int code, Datum arg);
96 : static void logicalrep_worker_onexit(int code, Datum arg);
97 : static void logicalrep_worker_detach(void);
98 : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
99 : static int logicalrep_pa_worker_count(Oid subid);
100 : static void logicalrep_launcher_attach_dshmem(void);
101 : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
102 : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
103 :
104 :
105 : /*
106 : * Load the list of subscriptions.
107 : *
108 : * Only the fields interesting for worker start/stop functions are filled for
109 : * each subscription.
110 : */
111 : static List *
112 5248 : get_subscription_list(void)
113 : {
114 5248 : List *res = NIL;
115 : Relation rel;
116 : TableScanDesc scan;
117 : HeapTuple tup;
118 : MemoryContext resultcxt;
119 :
120 : /* This is the context that we will allocate our output data in */
121 5248 : resultcxt = CurrentMemoryContext;
122 :
123 : /*
124 : * Start a transaction so we can access pg_subscription.
125 : */
126 5248 : StartTransactionCommand();
127 :
128 5248 : rel = table_open(SubscriptionRelationId, AccessShareLock);
129 5248 : scan = table_beginscan_catalog(rel, 0, NULL);
130 :
131 6526 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
132 : {
133 1278 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
134 : Subscription *sub;
135 : MemoryContext oldcxt;
136 :
137 : /*
138 : * Allocate our results in the caller's context, not the
139 : * transaction's. We do this inside the loop, and restore the original
140 : * context at the end, so that leaky things like heap_getnext() are
141 : * not called in a potentially long-lived context.
142 : */
143 1278 : oldcxt = MemoryContextSwitchTo(resultcxt);
144 :
145 1278 : sub = (Subscription *) palloc0(sizeof(Subscription));
146 1278 : sub->oid = subform->oid;
147 1278 : sub->dbid = subform->subdbid;
148 1278 : sub->owner = subform->subowner;
149 1278 : sub->enabled = subform->subenabled;
150 1278 : sub->name = pstrdup(NameStr(subform->subname));
151 : /* We don't fill fields we are not interested in. */
152 :
153 1278 : res = lappend(res, sub);
154 1278 : MemoryContextSwitchTo(oldcxt);
155 : }
156 :
157 5248 : table_endscan(scan);
158 5248 : table_close(rel, AccessShareLock);
159 :
160 5248 : CommitTransactionCommand();
161 :
162 5248 : return res;
163 : }
164 :
165 : /*
166 : * Wait for a background worker to start up and attach to the shmem context.
167 : *
168 : * This is only needed for cleaning up the shared memory in case the worker
169 : * fails to attach.
170 : *
171 : * Returns whether the attach was successful.
172 : */
173 : static bool
174 696 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
175 : uint16 generation,
176 : BackgroundWorkerHandle *handle)
177 : {
178 696 : bool result = false;
179 696 : bool dropped_latch = false;
180 :
181 : for (;;)
182 2000 : {
183 : BgwHandleStatus status;
184 : pid_t pid;
185 : int rc;
186 :
187 2696 : CHECK_FOR_INTERRUPTS();
188 :
189 2696 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
190 :
191 : /* Worker either died or has started. Return false if died. */
192 2696 : if (!worker->in_use || worker->proc)
193 : {
194 694 : result = worker->in_use;
195 694 : LWLockRelease(LogicalRepWorkerLock);
196 694 : break;
197 : }
198 :
199 2002 : LWLockRelease(LogicalRepWorkerLock);
200 :
201 : /* Check if worker has died before attaching, and clean up after it. */
202 2002 : status = GetBackgroundWorkerPid(handle, &pid);
203 :
204 2002 : if (status == BGWH_STOPPED)
205 : {
206 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
207 : /* Ensure that this was indeed the worker we waited for. */
208 0 : if (generation == worker->generation)
209 0 : logicalrep_worker_cleanup(worker);
210 0 : LWLockRelease(LogicalRepWorkerLock);
211 0 : break; /* result is already false */
212 : }
213 :
214 : /*
215 : * We need timeout because we generally don't get notified via latch
216 : * about the worker attach. But we don't expect to have to wait long.
217 : */
218 2002 : rc = WaitLatch(MyLatch,
219 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
220 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
221 :
222 2002 : if (rc & WL_LATCH_SET)
223 : {
224 848 : ResetLatch(MyLatch);
225 848 : CHECK_FOR_INTERRUPTS();
226 846 : dropped_latch = true;
227 : }
228 : }
229 :
230 : /*
231 : * If we had to clear a latch event in order to wait, be sure to restore
232 : * it before exiting. Otherwise caller may miss events.
233 : */
234 694 : if (dropped_latch)
235 694 : SetLatch(MyLatch);
236 :
237 694 : return result;
238 : }
239 :
240 : /*
241 : * Walks the workers array and searches for one that matches given
242 : * subscription id and relid.
243 : *
244 : * We are only interested in the leader apply worker or table sync worker.
245 : */
246 : LogicalRepWorker *
247 5542 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
248 : {
249 : int i;
250 5542 : LogicalRepWorker *res = NULL;
251 :
252 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
253 :
254 : /* Search for attached worker for a given subscription id. */
255 16654 : for (i = 0; i < max_logical_replication_workers; i++)
256 : {
257 14612 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
258 :
259 : /* Skip parallel apply workers. */
260 14612 : if (isParallelApplyWorker(w))
261 0 : continue;
262 :
263 14612 : if (w->in_use && w->subid == subid && w->relid == relid &&
264 3500 : (!only_running || w->proc))
265 : {
266 3500 : res = w;
267 3500 : break;
268 : }
269 : }
270 :
271 5542 : return res;
272 : }
273 :
274 : /*
275 : * Similar to logicalrep_worker_find(), but returns a list of all workers for
276 : * the subscription, instead of just one.
277 : */
278 : List *
279 1050 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
280 : {
281 : int i;
282 1050 : List *res = NIL;
283 :
284 1050 : if (acquire_lock)
285 216 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
286 :
287 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
288 :
289 : /* Search for attached worker for a given subscription id. */
290 5442 : for (i = 0; i < max_logical_replication_workers; i++)
291 : {
292 4392 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
293 :
294 4392 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
295 716 : res = lappend(res, w);
296 : }
297 :
298 1050 : if (acquire_lock)
299 216 : LWLockRelease(LogicalRepWorkerLock);
300 :
301 1050 : return res;
302 : }
303 :
304 : /*
305 : * Start new logical replication background worker, if possible.
306 : *
307 : * Returns true on success, false on failure.
308 : */
309 : bool
310 696 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
311 : Oid dbid, Oid subid, const char *subname, Oid userid,
312 : Oid relid, dsm_handle subworker_dsm)
313 : {
314 : BackgroundWorker bgw;
315 : BackgroundWorkerHandle *bgw_handle;
316 : uint16 generation;
317 : int i;
318 696 : int slot = 0;
319 696 : LogicalRepWorker *worker = NULL;
320 : int nsyncworkers;
321 : int nparallelapplyworkers;
322 : TimestampTz now;
323 696 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
324 696 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
325 :
326 : /*----------
327 : * Sanity checks:
328 : * - must be valid worker type
329 : * - tablesync workers are only ones to have relid
330 : * - parallel apply worker is the only kind of subworker
331 : */
332 : Assert(wtype != WORKERTYPE_UNKNOWN);
333 : Assert(is_tablesync_worker == OidIsValid(relid));
334 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
335 :
336 696 : ereport(DEBUG1,
337 : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
338 : subname)));
339 :
340 : /* Report this after the initial starting message for consistency. */
341 696 : if (max_active_replication_origins == 0)
342 0 : ereport(ERROR,
343 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
344 : errmsg("cannot start logical replication workers when \"max_active_replication_origins\"=0")));
345 :
346 : /*
347 : * We need to do the modification of the shared memory under lock so that
348 : * we have consistent view.
349 : */
350 696 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
351 :
352 696 : retry:
353 : /* Find unused worker slot. */
354 1278 : for (i = 0; i < max_logical_replication_workers; i++)
355 : {
356 1278 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
357 :
358 1278 : if (!w->in_use)
359 : {
360 696 : worker = w;
361 696 : slot = i;
362 696 : break;
363 : }
364 : }
365 :
366 696 : nsyncworkers = logicalrep_sync_worker_count(subid);
367 :
368 696 : now = GetCurrentTimestamp();
369 :
370 : /*
371 : * If we didn't find a free slot, try to do garbage collection. The
372 : * reason we do this is because if some worker failed to start up and its
373 : * parent has crashed while waiting, the in_use state was never cleared.
374 : */
375 696 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
376 : {
377 0 : bool did_cleanup = false;
378 :
379 0 : for (i = 0; i < max_logical_replication_workers; i++)
380 : {
381 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
382 :
383 : /*
384 : * If the worker was marked in use but didn't manage to attach in
385 : * time, clean it up.
386 : */
387 0 : if (w->in_use && !w->proc &&
388 0 : TimestampDifferenceExceeds(w->launch_time, now,
389 : wal_receiver_timeout))
390 : {
391 0 : elog(WARNING,
392 : "logical replication worker for subscription %u took too long to start; canceled",
393 : w->subid);
394 :
395 0 : logicalrep_worker_cleanup(w);
396 0 : did_cleanup = true;
397 : }
398 : }
399 :
400 0 : if (did_cleanup)
401 0 : goto retry;
402 : }
403 :
404 : /*
405 : * We don't allow to invoke more sync workers once we have reached the
406 : * sync worker limit per subscription. So, just return silently as we
407 : * might get here because of an otherwise harmless race condition.
408 : */
409 696 : if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
410 : {
411 0 : LWLockRelease(LogicalRepWorkerLock);
412 0 : return false;
413 : }
414 :
415 696 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
416 :
417 : /*
418 : * Return false if the number of parallel apply workers reached the limit
419 : * per subscription.
420 : */
421 696 : if (is_parallel_apply_worker &&
422 20 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
423 : {
424 0 : LWLockRelease(LogicalRepWorkerLock);
425 0 : return false;
426 : }
427 :
428 : /*
429 : * However if there are no more free worker slots, inform user about it
430 : * before exiting.
431 : */
432 696 : if (worker == NULL)
433 : {
434 0 : LWLockRelease(LogicalRepWorkerLock);
435 0 : ereport(WARNING,
436 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
437 : errmsg("out of logical replication worker slots"),
438 : errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
439 0 : return false;
440 : }
441 :
442 : /* Prepare the worker slot. */
443 696 : worker->type = wtype;
444 696 : worker->launch_time = now;
445 696 : worker->in_use = true;
446 696 : worker->generation++;
447 696 : worker->proc = NULL;
448 696 : worker->dbid = dbid;
449 696 : worker->userid = userid;
450 696 : worker->subid = subid;
451 696 : worker->relid = relid;
452 696 : worker->relstate = SUBREL_STATE_UNKNOWN;
453 696 : worker->relstate_lsn = InvalidXLogRecPtr;
454 696 : worker->stream_fileset = NULL;
455 696 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
456 696 : worker->parallel_apply = is_parallel_apply_worker;
457 696 : worker->last_lsn = InvalidXLogRecPtr;
458 696 : TIMESTAMP_NOBEGIN(worker->last_send_time);
459 696 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
460 696 : worker->reply_lsn = InvalidXLogRecPtr;
461 696 : TIMESTAMP_NOBEGIN(worker->reply_time);
462 :
463 : /* Before releasing lock, remember generation for future identification. */
464 696 : generation = worker->generation;
465 :
466 696 : LWLockRelease(LogicalRepWorkerLock);
467 :
468 : /* Register the new dynamic worker. */
469 696 : memset(&bgw, 0, sizeof(bgw));
470 696 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
471 : BGWORKER_BACKEND_DATABASE_CONNECTION;
472 696 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
473 696 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
474 :
475 696 : switch (worker->type)
476 : {
477 292 : case WORKERTYPE_APPLY:
478 292 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
479 292 : snprintf(bgw.bgw_name, BGW_MAXLEN,
480 : "logical replication apply worker for subscription %u",
481 : subid);
482 292 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
483 292 : break;
484 :
485 20 : case WORKERTYPE_PARALLEL_APPLY:
486 20 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
487 20 : snprintf(bgw.bgw_name, BGW_MAXLEN,
488 : "logical replication parallel apply worker for subscription %u",
489 : subid);
490 20 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
491 :
492 20 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
493 20 : break;
494 :
495 384 : case WORKERTYPE_TABLESYNC:
496 384 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
497 384 : snprintf(bgw.bgw_name, BGW_MAXLEN,
498 : "logical replication tablesync worker for subscription %u sync %u",
499 : subid,
500 : relid);
501 384 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
502 384 : break;
503 :
504 0 : case WORKERTYPE_UNKNOWN:
505 : /* Should never happen. */
506 0 : elog(ERROR, "unknown worker type");
507 : }
508 :
509 696 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
510 696 : bgw.bgw_notify_pid = MyProcPid;
511 696 : bgw.bgw_main_arg = Int32GetDatum(slot);
512 :
513 696 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
514 : {
515 : /* Failed to start worker, so clean up the worker slot. */
516 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
517 : Assert(generation == worker->generation);
518 0 : logicalrep_worker_cleanup(worker);
519 0 : LWLockRelease(LogicalRepWorkerLock);
520 :
521 0 : ereport(WARNING,
522 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
523 : errmsg("out of background worker slots"),
524 : errhint("You might need to increase \"%s\".", "max_worker_processes")));
525 0 : return false;
526 : }
527 :
528 : /* Now wait until it attaches. */
529 696 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
530 : }
531 :
532 : /*
533 : * Internal function to stop the worker and wait until it detaches from the
534 : * slot.
535 : */
536 : static void
537 158 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
538 : {
539 : uint16 generation;
540 :
541 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
542 :
543 : /*
544 : * Remember which generation was our worker so we can check if what we see
545 : * is still the same one.
546 : */
547 158 : generation = worker->generation;
548 :
549 : /*
550 : * If we found a worker but it does not have proc set then it is still
551 : * starting up; wait for it to finish starting and then kill it.
552 : */
553 162 : while (worker->in_use && !worker->proc)
554 : {
555 : int rc;
556 :
557 10 : LWLockRelease(LogicalRepWorkerLock);
558 :
559 : /* Wait a bit --- we don't expect to have to wait long. */
560 10 : rc = WaitLatch(MyLatch,
561 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
562 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
563 :
564 10 : if (rc & WL_LATCH_SET)
565 : {
566 0 : ResetLatch(MyLatch);
567 0 : CHECK_FOR_INTERRUPTS();
568 : }
569 :
570 : /* Recheck worker status. */
571 10 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
572 :
573 : /*
574 : * Check whether the worker slot is no longer used, which would mean
575 : * that the worker has exited, or whether the worker generation is
576 : * different, meaning that a different worker has taken the slot.
577 : */
578 10 : if (!worker->in_use || worker->generation != generation)
579 0 : return;
580 :
581 : /* Worker has assigned proc, so it has started. */
582 10 : if (worker->proc)
583 6 : break;
584 : }
585 :
586 : /* Now terminate the worker ... */
587 158 : kill(worker->proc->pid, signo);
588 :
589 : /* ... and wait for it to die. */
590 : for (;;)
591 242 : {
592 : int rc;
593 :
594 : /* is it gone? */
595 400 : if (!worker->proc || worker->generation != generation)
596 : break;
597 :
598 242 : LWLockRelease(LogicalRepWorkerLock);
599 :
600 : /* Wait a bit --- we don't expect to have to wait long. */
601 242 : rc = WaitLatch(MyLatch,
602 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
603 : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
604 :
605 242 : if (rc & WL_LATCH_SET)
606 : {
607 116 : ResetLatch(MyLatch);
608 116 : CHECK_FOR_INTERRUPTS();
609 : }
610 :
611 242 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
612 : }
613 : }
614 :
615 : /*
616 : * Stop the logical replication worker for subid/relid, if any.
617 : */
618 : void
619 182 : logicalrep_worker_stop(Oid subid, Oid relid)
620 : {
621 : LogicalRepWorker *worker;
622 :
623 182 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
624 :
625 182 : worker = logicalrep_worker_find(subid, relid, false);
626 :
627 182 : if (worker)
628 : {
629 : Assert(!isParallelApplyWorker(worker));
630 142 : logicalrep_worker_stop_internal(worker, SIGTERM);
631 : }
632 :
633 182 : LWLockRelease(LogicalRepWorkerLock);
634 182 : }
635 :
636 : /*
637 : * Stop the given logical replication parallel apply worker.
638 : *
639 : * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
640 : * worker so that the worker exits cleanly.
641 : */
642 : void
643 10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
644 : {
645 : int slot_no;
646 : uint16 generation;
647 : LogicalRepWorker *worker;
648 :
649 10 : SpinLockAcquire(&winfo->shared->mutex);
650 10 : generation = winfo->shared->logicalrep_worker_generation;
651 10 : slot_no = winfo->shared->logicalrep_worker_slot_no;
652 10 : SpinLockRelease(&winfo->shared->mutex);
653 :
654 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
655 :
656 : /*
657 : * Detach from the error_mq_handle for the parallel apply worker before
658 : * stopping it. This prevents the leader apply worker from trying to
659 : * receive the message from the error queue that might already be detached
660 : * by the parallel apply worker.
661 : */
662 10 : if (winfo->error_mq_handle)
663 : {
664 10 : shm_mq_detach(winfo->error_mq_handle);
665 10 : winfo->error_mq_handle = NULL;
666 : }
667 :
668 10 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
669 :
670 10 : worker = &LogicalRepCtx->workers[slot_no];
671 : Assert(isParallelApplyWorker(worker));
672 :
673 : /*
674 : * Only stop the worker if the generation matches and the worker is alive.
675 : */
676 10 : if (worker->generation == generation && worker->proc)
677 10 : logicalrep_worker_stop_internal(worker, SIGINT);
678 :
679 10 : LWLockRelease(LogicalRepWorkerLock);
680 10 : }
681 :
682 : /*
683 : * Wake up (using latch) any logical replication worker for specified sub/rel.
684 : */
685 : void
686 418 : logicalrep_worker_wakeup(Oid subid, Oid relid)
687 : {
688 : LogicalRepWorker *worker;
689 :
690 418 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
691 :
692 418 : worker = logicalrep_worker_find(subid, relid, true);
693 :
694 418 : if (worker)
695 418 : logicalrep_worker_wakeup_ptr(worker);
696 :
697 418 : LWLockRelease(LogicalRepWorkerLock);
698 418 : }
699 :
700 : /*
701 : * Wake up (using latch) the specified logical replication worker.
702 : *
703 : * Caller must hold lock, else worker->proc could change under us.
704 : */
705 : void
706 1252 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
707 : {
708 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
709 :
710 1252 : SetLatch(&worker->proc->procLatch);
711 1252 : }
712 :
713 : /*
714 : * Attach to a slot.
715 : */
716 : void
717 866 : logicalrep_worker_attach(int slot)
718 : {
719 : /* Block concurrent access. */
720 866 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
721 :
722 : Assert(slot >= 0 && slot < max_logical_replication_workers);
723 866 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
724 :
725 866 : if (!MyLogicalRepWorker->in_use)
726 : {
727 0 : LWLockRelease(LogicalRepWorkerLock);
728 0 : ereport(ERROR,
729 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
730 : errmsg("logical replication worker slot %d is empty, cannot attach",
731 : slot)));
732 : }
733 :
734 866 : if (MyLogicalRepWorker->proc)
735 : {
736 0 : LWLockRelease(LogicalRepWorkerLock);
737 0 : ereport(ERROR,
738 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
739 : errmsg("logical replication worker slot %d is already used by "
740 : "another worker, cannot attach", slot)));
741 : }
742 :
743 866 : MyLogicalRepWorker->proc = MyProc;
744 866 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
745 :
746 866 : LWLockRelease(LogicalRepWorkerLock);
747 866 : }
748 :
749 : /*
750 : * Stop the parallel apply workers if any, and detach the leader apply worker
751 : * (cleans up the worker info).
752 : */
753 : static void
754 866 : logicalrep_worker_detach(void)
755 : {
756 : /* Stop the parallel apply workers. */
757 866 : if (am_leader_apply_worker())
758 : {
759 : List *workers;
760 : ListCell *lc;
761 :
762 : /*
763 : * Detach from the error_mq_handle for all parallel apply workers
764 : * before terminating them. This prevents the leader apply worker from
765 : * receiving the worker termination message and sending it to logs
766 : * when the same is already done by the parallel worker.
767 : */
768 458 : pa_detach_all_error_mq();
769 :
770 458 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
771 :
772 458 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
773 922 : foreach(lc, workers)
774 : {
775 464 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
776 :
777 464 : if (isParallelApplyWorker(w))
778 6 : logicalrep_worker_stop_internal(w, SIGTERM);
779 : }
780 :
781 458 : LWLockRelease(LogicalRepWorkerLock);
782 : }
783 :
784 : /* Block concurrent access. */
785 866 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
786 :
787 866 : logicalrep_worker_cleanup(MyLogicalRepWorker);
788 :
789 866 : LWLockRelease(LogicalRepWorkerLock);
790 866 : }
791 :
792 : /*
793 : * Clean up worker info.
794 : */
795 : static void
796 866 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
797 : {
798 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
799 :
800 866 : worker->type = WORKERTYPE_UNKNOWN;
801 866 : worker->in_use = false;
802 866 : worker->proc = NULL;
803 866 : worker->dbid = InvalidOid;
804 866 : worker->userid = InvalidOid;
805 866 : worker->subid = InvalidOid;
806 866 : worker->relid = InvalidOid;
807 866 : worker->leader_pid = InvalidPid;
808 866 : worker->parallel_apply = false;
809 866 : }
810 :
811 : /*
812 : * Cleanup function for logical replication launcher.
813 : *
814 : * Called on logical replication launcher exit.
815 : */
816 : static void
817 812 : logicalrep_launcher_onexit(int code, Datum arg)
818 : {
819 812 : LogicalRepCtx->launcher_pid = 0;
820 812 : }
821 :
822 : /*
823 : * Cleanup function.
824 : *
825 : * Called on logical replication worker exit.
826 : */
827 : static void
828 866 : logicalrep_worker_onexit(int code, Datum arg)
829 : {
830 : /* Disconnect gracefully from the remote side. */
831 866 : if (LogRepWorkerWalRcvConn)
832 768 : walrcv_disconnect(LogRepWorkerWalRcvConn);
833 :
834 866 : logicalrep_worker_detach();
835 :
836 : /* Cleanup fileset used for streaming transactions. */
837 866 : if (MyLogicalRepWorker->stream_fileset != NULL)
838 28 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
839 :
840 : /*
841 : * Session level locks may be acquired outside of a transaction in
842 : * parallel apply mode and will not be released when the worker
843 : * terminates, so manually release all locks before the worker exits.
844 : *
845 : * The locks will be acquired once the worker is initialized.
846 : */
847 866 : if (!InitializingApplyWorker)
848 860 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
849 :
850 866 : ApplyLauncherWakeup();
851 866 : }
852 :
853 : /*
854 : * Count the number of registered (not necessarily running) sync workers
855 : * for a subscription.
856 : */
857 : int
858 2282 : logicalrep_sync_worker_count(Oid subid)
859 : {
860 : int i;
861 2282 : int res = 0;
862 :
863 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
864 :
865 : /* Search for attached worker for a given subscription id. */
866 11754 : for (i = 0; i < max_logical_replication_workers; i++)
867 : {
868 9472 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
869 :
870 9472 : if (isTablesyncWorker(w) && w->subid == subid)
871 2640 : res++;
872 : }
873 :
874 2282 : return res;
875 : }
876 :
877 : /*
878 : * Count the number of registered (but not necessarily running) parallel apply
879 : * workers for a subscription.
880 : */
881 : static int
882 696 : logicalrep_pa_worker_count(Oid subid)
883 : {
884 : int i;
885 696 : int res = 0;
886 :
887 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
888 :
889 : /*
890 : * Scan all attached parallel apply workers, only counting those which
891 : * have the given subscription id.
892 : */
893 3668 : for (i = 0; i < max_logical_replication_workers; i++)
894 : {
895 2972 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
896 :
897 2972 : if (isParallelApplyWorker(w) && w->subid == subid)
898 4 : res++;
899 : }
900 :
901 696 : return res;
902 : }
903 :
904 : /*
905 : * ApplyLauncherShmemSize
906 : * Compute space needed for replication launcher shared memory
907 : */
908 : Size
909 8204 : ApplyLauncherShmemSize(void)
910 : {
911 : Size size;
912 :
913 : /*
914 : * Need the fixed struct and the array of LogicalRepWorker.
915 : */
916 8204 : size = sizeof(LogicalRepCtxStruct);
917 8204 : size = MAXALIGN(size);
918 8204 : size = add_size(size, mul_size(max_logical_replication_workers,
919 : sizeof(LogicalRepWorker)));
920 8204 : return size;
921 : }
922 :
923 : /*
924 : * ApplyLauncherRegister
925 : * Register a background worker running the logical replication launcher.
926 : */
927 : void
928 1712 : ApplyLauncherRegister(void)
929 : {
930 : BackgroundWorker bgw;
931 :
932 : /*
933 : * The logical replication launcher is disabled during binary upgrades, to
934 : * prevent logical replication workers from running on the source cluster.
935 : * That could cause replication origins to move forward after having been
936 : * copied to the target cluster, potentially creating conflicts with the
937 : * copied data files.
938 : */
939 1712 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
940 100 : return;
941 :
942 1612 : memset(&bgw, 0, sizeof(bgw));
943 1612 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
944 : BGWORKER_BACKEND_DATABASE_CONNECTION;
945 1612 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
946 1612 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
947 1612 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
948 1612 : snprintf(bgw.bgw_name, BGW_MAXLEN,
949 : "logical replication launcher");
950 1612 : snprintf(bgw.bgw_type, BGW_MAXLEN,
951 : "logical replication launcher");
952 1612 : bgw.bgw_restart_time = 5;
953 1612 : bgw.bgw_notify_pid = 0;
954 1612 : bgw.bgw_main_arg = (Datum) 0;
955 :
956 1612 : RegisterBackgroundWorker(&bgw);
957 : }
958 :
959 : /*
960 : * ApplyLauncherShmemInit
961 : * Allocate and initialize replication launcher shared memory
962 : */
963 : void
964 2126 : ApplyLauncherShmemInit(void)
965 : {
966 : bool found;
967 :
968 2126 : LogicalRepCtx = (LogicalRepCtxStruct *)
969 2126 : ShmemInitStruct("Logical Replication Launcher Data",
970 : ApplyLauncherShmemSize(),
971 : &found);
972 :
973 2126 : if (!found)
974 : {
975 : int slot;
976 :
977 2126 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
978 :
979 2126 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
980 2126 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
981 :
982 : /* Initialize memory and spin locks for each worker slot. */
983 10564 : for (slot = 0; slot < max_logical_replication_workers; slot++)
984 : {
985 8438 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
986 :
987 8438 : memset(worker, 0, sizeof(LogicalRepWorker));
988 8438 : SpinLockInit(&worker->relmutex);
989 : }
990 : }
991 2126 : }
992 :
993 : /*
994 : * Initialize or attach to the dynamic shared hash table that stores the
995 : * last-start times, if not already done.
996 : * This must be called before accessing the table.
997 : */
998 : static void
999 1010 : logicalrep_launcher_attach_dshmem(void)
1000 : {
1001 : MemoryContext oldcontext;
1002 :
1003 : /* Quick exit if we already did this. */
1004 1010 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
1005 908 : last_start_times != NULL)
1006 668 : return;
1007 :
1008 : /* Otherwise, use a lock to ensure only one process creates the table. */
1009 342 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1010 :
1011 : /* Be sure any local memory allocated by DSA routines is persistent. */
1012 342 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1013 :
1014 342 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1015 : {
1016 : /* Initialize dynamic shared hash table for last-start times. */
1017 102 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1018 102 : dsa_pin(last_start_times_dsa);
1019 102 : dsa_pin_mapping(last_start_times_dsa);
1020 102 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1021 :
1022 : /* Store handles in shared memory for other backends to use. */
1023 102 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1024 102 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1025 : }
1026 240 : else if (!last_start_times)
1027 : {
1028 : /* Attach to existing dynamic shared hash table. */
1029 240 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1030 240 : dsa_pin_mapping(last_start_times_dsa);
1031 240 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
1032 240 : LogicalRepCtx->last_start_dsh, NULL);
1033 : }
1034 :
1035 342 : MemoryContextSwitchTo(oldcontext);
1036 342 : LWLockRelease(LogicalRepWorkerLock);
1037 : }
1038 :
1039 : /*
1040 : * Set the last-start time for the subscription.
1041 : */
1042 : static void
1043 292 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1044 : {
1045 : LauncherLastStartTimesEntry *entry;
1046 : bool found;
1047 :
1048 292 : logicalrep_launcher_attach_dshmem();
1049 :
1050 292 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1051 292 : entry->last_start_time = start_time;
1052 292 : dshash_release_lock(last_start_times, entry);
1053 292 : }
1054 :
1055 : /*
1056 : * Return the last-start time for the subscription, or 0 if there isn't one.
1057 : */
1058 : static TimestampTz
1059 416 : ApplyLauncherGetWorkerStartTime(Oid subid)
1060 : {
1061 : LauncherLastStartTimesEntry *entry;
1062 : TimestampTz ret;
1063 :
1064 416 : logicalrep_launcher_attach_dshmem();
1065 :
1066 416 : entry = dshash_find(last_start_times, &subid, false);
1067 416 : if (entry == NULL)
1068 248 : return 0;
1069 :
1070 168 : ret = entry->last_start_time;
1071 168 : dshash_release_lock(last_start_times, entry);
1072 :
1073 168 : return ret;
1074 : }
1075 :
1076 : /*
1077 : * Remove the last-start-time entry for the subscription, if one exists.
1078 : *
1079 : * This has two use-cases: to remove the entry related to a subscription
1080 : * that's been deleted or disabled (just to avoid leaking shared memory),
1081 : * and to allow immediate restart of an apply worker that has exited
1082 : * due to subscription parameter changes.
1083 : */
1084 : void
1085 302 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1086 : {
1087 302 : logicalrep_launcher_attach_dshmem();
1088 :
1089 302 : (void) dshash_delete_key(last_start_times, &subid);
1090 302 : }
1091 :
1092 : /*
1093 : * Wakeup the launcher on commit if requested.
1094 : */
1095 : void
1096 1036470 : AtEOXact_ApplyLauncher(bool isCommit)
1097 : {
1098 1036470 : if (isCommit)
1099 : {
1100 986828 : if (on_commit_launcher_wakeup)
1101 268 : ApplyLauncherWakeup();
1102 : }
1103 :
1104 1036470 : on_commit_launcher_wakeup = false;
1105 1036470 : }
1106 :
1107 : /*
1108 : * Request wakeup of the launcher on commit of the transaction.
1109 : *
1110 : * This is used to send launcher signal to stop sleeping and process the
1111 : * subscriptions when current transaction commits. Should be used when new
1112 : * tuple was added to the pg_subscription catalog.
1113 : */
1114 : void
1115 268 : ApplyLauncherWakeupAtCommit(void)
1116 : {
1117 268 : if (!on_commit_launcher_wakeup)
1118 268 : on_commit_launcher_wakeup = true;
1119 268 : }
1120 :
1121 : static void
1122 1134 : ApplyLauncherWakeup(void)
1123 : {
1124 1134 : if (LogicalRepCtx->launcher_pid != 0)
1125 1098 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1126 1134 : }
1127 :
1128 : /*
1129 : * Main loop for the apply launcher process.
1130 : */
1131 : void
1132 812 : ApplyLauncherMain(Datum main_arg)
1133 : {
1134 812 : ereport(DEBUG1,
1135 : (errmsg_internal("logical replication launcher started")));
1136 :
1137 812 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1138 :
1139 : Assert(LogicalRepCtx->launcher_pid == 0);
1140 812 : LogicalRepCtx->launcher_pid = MyProcPid;
1141 :
1142 : /* Establish signal handlers. */
1143 812 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
1144 812 : pqsignal(SIGTERM, die);
1145 812 : BackgroundWorkerUnblockSignals();
1146 :
1147 : /*
1148 : * Establish connection to nailed catalogs (we only ever access
1149 : * pg_subscription).
1150 : */
1151 812 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1152 :
1153 : /* Enter main loop */
1154 : for (;;)
1155 4440 : {
1156 : int rc;
1157 : List *sublist;
1158 : ListCell *lc;
1159 : MemoryContext subctx;
1160 : MemoryContext oldctx;
1161 5252 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1162 :
1163 5252 : CHECK_FOR_INTERRUPTS();
1164 :
1165 : /* Use temporary context to avoid leaking memory across cycles. */
1166 5248 : subctx = AllocSetContextCreate(TopMemoryContext,
1167 : "Logical Replication Launcher sublist",
1168 : ALLOCSET_DEFAULT_SIZES);
1169 5248 : oldctx = MemoryContextSwitchTo(subctx);
1170 :
1171 : /* Start any missing workers for enabled subscriptions. */
1172 5248 : sublist = get_subscription_list();
1173 6526 : foreach(lc, sublist)
1174 : {
1175 1278 : Subscription *sub = (Subscription *) lfirst(lc);
1176 : LogicalRepWorker *w;
1177 : TimestampTz last_start;
1178 : TimestampTz now;
1179 : long elapsed;
1180 :
1181 1278 : if (!sub->enabled)
1182 56 : continue;
1183 :
1184 1222 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1185 1222 : w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1186 1222 : LWLockRelease(LogicalRepWorkerLock);
1187 :
1188 1222 : if (w != NULL)
1189 806 : continue; /* worker is running already */
1190 :
1191 : /*
1192 : * If the worker is eligible to start now, launch it. Otherwise,
1193 : * adjust wait_time so that we'll wake up as soon as it can be
1194 : * started.
1195 : *
1196 : * Each subscription's apply worker can only be restarted once per
1197 : * wal_retrieve_retry_interval, so that errors do not cause us to
1198 : * repeatedly restart the worker as fast as possible. In cases
1199 : * where a restart is expected (e.g., subscription parameter
1200 : * changes), another process should remove the last-start entry
1201 : * for the subscription so that the worker can be restarted
1202 : * without waiting for wal_retrieve_retry_interval to elapse.
1203 : */
1204 416 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1205 416 : now = GetCurrentTimestamp();
1206 416 : if (last_start == 0 ||
1207 168 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1208 : {
1209 292 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
1210 292 : if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
1211 292 : sub->dbid, sub->oid, sub->name,
1212 : sub->owner, InvalidOid,
1213 : DSM_HANDLE_INVALID))
1214 : {
1215 : /*
1216 : * We get here either if we failed to launch a worker
1217 : * (perhaps for resource-exhaustion reasons) or if we
1218 : * launched one but it immediately quit. Either way, it
1219 : * seems appropriate to try again after
1220 : * wal_retrieve_retry_interval.
1221 : */
1222 0 : wait_time = Min(wait_time,
1223 : wal_retrieve_retry_interval);
1224 : }
1225 : }
1226 : else
1227 : {
1228 124 : wait_time = Min(wait_time,
1229 : wal_retrieve_retry_interval - elapsed);
1230 : }
1231 : }
1232 :
1233 : /* Switch back to original memory context. */
1234 5248 : MemoryContextSwitchTo(oldctx);
1235 : /* Clean the temporary memory. */
1236 5248 : MemoryContextDelete(subctx);
1237 :
1238 : /* Wait for more work. */
1239 5248 : rc = WaitLatch(MyLatch,
1240 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1241 : wait_time,
1242 : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1243 :
1244 5242 : if (rc & WL_LATCH_SET)
1245 : {
1246 5218 : ResetLatch(MyLatch);
1247 5218 : CHECK_FOR_INTERRUPTS();
1248 : }
1249 :
1250 4440 : if (ConfigReloadPending)
1251 : {
1252 72 : ConfigReloadPending = false;
1253 72 : ProcessConfigFile(PGC_SIGHUP);
1254 : }
1255 : }
1256 :
1257 : /* Not reachable */
1258 : }
1259 :
1260 : /*
1261 : * Is current process the logical replication launcher?
1262 : */
1263 : bool
1264 1018 : IsLogicalLauncher(void)
1265 : {
1266 1018 : return LogicalRepCtx->launcher_pid == MyProcPid;
1267 : }
1268 :
1269 : /*
1270 : * Return the pid of the leader apply worker if the given pid is the pid of a
1271 : * parallel apply worker, otherwise, return InvalidPid.
1272 : */
1273 : pid_t
1274 1808 : GetLeaderApplyWorkerPid(pid_t pid)
1275 : {
1276 1808 : int leader_pid = InvalidPid;
1277 : int i;
1278 :
1279 1808 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1280 :
1281 9040 : for (i = 0; i < max_logical_replication_workers; i++)
1282 : {
1283 7232 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1284 :
1285 7232 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1286 : {
1287 0 : leader_pid = w->leader_pid;
1288 0 : break;
1289 : }
1290 : }
1291 :
1292 1808 : LWLockRelease(LogicalRepWorkerLock);
1293 :
1294 1808 : return leader_pid;
1295 : }
1296 :
1297 : /*
1298 : * Returns state of the subscriptions.
1299 : */
1300 : Datum
1301 2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1302 : {
1303 : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1304 2 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1305 : int i;
1306 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1307 :
1308 2 : InitMaterializedSRF(fcinfo, 0);
1309 :
1310 : /* Make sure we get consistent view of the workers. */
1311 2 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1312 :
1313 10 : for (i = 0; i < max_logical_replication_workers; i++)
1314 : {
1315 : /* for each row */
1316 8 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1317 8 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1318 : int worker_pid;
1319 : LogicalRepWorker worker;
1320 :
1321 8 : memcpy(&worker, &LogicalRepCtx->workers[i],
1322 : sizeof(LogicalRepWorker));
1323 8 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1324 4 : continue;
1325 :
1326 4 : if (OidIsValid(subid) && worker.subid != subid)
1327 0 : continue;
1328 :
1329 4 : worker_pid = worker.proc->pid;
1330 :
1331 4 : values[0] = ObjectIdGetDatum(worker.subid);
1332 4 : if (isTablesyncWorker(&worker))
1333 0 : values[1] = ObjectIdGetDatum(worker.relid);
1334 : else
1335 4 : nulls[1] = true;
1336 4 : values[2] = Int32GetDatum(worker_pid);
1337 :
1338 4 : if (isParallelApplyWorker(&worker))
1339 0 : values[3] = Int32GetDatum(worker.leader_pid);
1340 : else
1341 4 : nulls[3] = true;
1342 :
1343 4 : if (XLogRecPtrIsInvalid(worker.last_lsn))
1344 2 : nulls[4] = true;
1345 : else
1346 2 : values[4] = LSNGetDatum(worker.last_lsn);
1347 4 : if (worker.last_send_time == 0)
1348 0 : nulls[5] = true;
1349 : else
1350 4 : values[5] = TimestampTzGetDatum(worker.last_send_time);
1351 4 : if (worker.last_recv_time == 0)
1352 0 : nulls[6] = true;
1353 : else
1354 4 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
1355 4 : if (XLogRecPtrIsInvalid(worker.reply_lsn))
1356 2 : nulls[7] = true;
1357 : else
1358 2 : values[7] = LSNGetDatum(worker.reply_lsn);
1359 4 : if (worker.reply_time == 0)
1360 0 : nulls[8] = true;
1361 : else
1362 4 : values[8] = TimestampTzGetDatum(worker.reply_time);
1363 :
1364 4 : switch (worker.type)
1365 : {
1366 4 : case WORKERTYPE_APPLY:
1367 4 : values[9] = CStringGetTextDatum("apply");
1368 4 : break;
1369 0 : case WORKERTYPE_PARALLEL_APPLY:
1370 0 : values[9] = CStringGetTextDatum("parallel apply");
1371 0 : break;
1372 0 : case WORKERTYPE_TABLESYNC:
1373 0 : values[9] = CStringGetTextDatum("table synchronization");
1374 0 : break;
1375 0 : case WORKERTYPE_UNKNOWN:
1376 : /* Should never happen. */
1377 0 : elog(ERROR, "unknown worker type");
1378 : }
1379 :
1380 4 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1381 : values, nulls);
1382 :
1383 : /*
1384 : * If only a single subscription was requested, and we found it,
1385 : * break.
1386 : */
1387 4 : if (OidIsValid(subid))
1388 0 : break;
1389 : }
1390 :
1391 2 : LWLockRelease(LogicalRepWorkerLock);
1392 :
1393 2 : return (Datum) 0;
1394 : }
|