Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * launcher.c
3 : * PostgreSQL logical replication worker launcher process
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/launcher.c
9 : *
10 : * NOTES
11 : * This module contains the logical replication worker launcher which
12 : * uses the background worker infrastructure to start the logical
13 : * replication workers for every enabled subscription.
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 :
18 : #include "postgres.h"
19 :
20 : #include "access/heapam.h"
21 : #include "access/htup.h"
22 : #include "access/htup_details.h"
23 : #include "access/tableam.h"
24 : #include "access/xact.h"
25 : #include "catalog/pg_subscription.h"
26 : #include "catalog/pg_subscription_rel.h"
27 : #include "funcapi.h"
28 : #include "lib/dshash.h"
29 : #include "miscadmin.h"
30 : #include "pgstat.h"
31 : #include "postmaster/bgworker.h"
32 : #include "postmaster/interrupt.h"
33 : #include "replication/logicallauncher.h"
34 : #include "replication/slot.h"
35 : #include "replication/walreceiver.h"
36 : #include "replication/worker_internal.h"
37 : #include "storage/ipc.h"
38 : #include "storage/proc.h"
39 : #include "storage/procarray.h"
40 : #include "tcop/tcopprot.h"
41 : #include "utils/builtins.h"
42 : #include "utils/memutils.h"
43 : #include "utils/pg_lsn.h"
44 : #include "utils/snapmgr.h"
45 :
46 : /* max sleep time between cycles (3min) */
47 : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
48 :
49 : /* GUC variables */
50 : int max_logical_replication_workers = 4;
51 : int max_sync_workers_per_subscription = 2;
52 : int max_parallel_apply_workers_per_subscription = 2;
53 :
54 : LogicalRepWorker *MyLogicalRepWorker = NULL;
55 :
56 : typedef struct LogicalRepCtxStruct
57 : {
58 : /* Supervisor process. */
59 : pid_t launcher_pid;
60 :
61 : /* Hash table holding last start times of subscriptions' apply workers. */
62 : dsa_handle last_start_dsa;
63 : dshash_table_handle last_start_dsh;
64 :
65 : /* Background workers. */
66 : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
67 : } LogicalRepCtxStruct;
68 :
69 : static LogicalRepCtxStruct *LogicalRepCtx;
70 :
71 : /* an entry in the last-start-times shared hash table */
72 : typedef struct LauncherLastStartTimesEntry
73 : {
74 : Oid subid; /* OID of logrep subscription (hash key) */
75 : TimestampTz last_start_time; /* last time its apply worker was started */
76 : } LauncherLastStartTimesEntry;
77 :
78 : /* parameters for the last-start-times shared hash table */
79 : static const dshash_parameters dsh_params = {
80 : sizeof(Oid),
81 : sizeof(LauncherLastStartTimesEntry),
82 : dshash_memcmp,
83 : dshash_memhash,
84 : dshash_memcpy,
85 : LWTRANCHE_LAUNCHER_HASH
86 : };
87 :
88 : static dsa_area *last_start_times_dsa = NULL;
89 : static dshash_table *last_start_times = NULL;
90 :
91 : static bool on_commit_launcher_wakeup = false;
92 :
93 :
94 : static void ApplyLauncherWakeup(void);
95 : static void logicalrep_launcher_onexit(int code, Datum arg);
96 : static void logicalrep_worker_onexit(int code, Datum arg);
97 : static void logicalrep_worker_detach(void);
98 : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
99 : static int logicalrep_pa_worker_count(Oid subid);
100 : static void logicalrep_launcher_attach_dshmem(void);
101 : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
102 : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
103 :
104 :
105 : /*
106 : * Load the list of subscriptions.
107 : *
108 : * Only the fields interesting for worker start/stop functions are filled for
109 : * each subscription.
110 : */
111 : static List *
112 4474 : get_subscription_list(void)
113 : {
114 4474 : List *res = NIL;
115 : Relation rel;
116 : TableScanDesc scan;
117 : HeapTuple tup;
118 : MemoryContext resultcxt;
119 :
120 : /* This is the context that we will allocate our output data in */
121 4474 : resultcxt = CurrentMemoryContext;
122 :
123 : /*
124 : * Start a transaction so we can access pg_subscription.
125 : */
126 4474 : StartTransactionCommand();
127 :
128 4474 : rel = table_open(SubscriptionRelationId, AccessShareLock);
129 4474 : scan = table_beginscan_catalog(rel, 0, NULL);
130 :
131 5264 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
132 : {
133 790 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
134 : Subscription *sub;
135 : MemoryContext oldcxt;
136 :
137 : /*
138 : * Allocate our results in the caller's context, not the
139 : * transaction's. We do this inside the loop, and restore the original
140 : * context at the end, so that leaky things like heap_getnext() are
141 : * not called in a potentially long-lived context.
142 : */
143 790 : oldcxt = MemoryContextSwitchTo(resultcxt);
144 :
145 790 : sub = (Subscription *) palloc0(sizeof(Subscription));
146 790 : sub->oid = subform->oid;
147 790 : sub->dbid = subform->subdbid;
148 790 : sub->owner = subform->subowner;
149 790 : sub->enabled = subform->subenabled;
150 790 : sub->name = pstrdup(NameStr(subform->subname));
151 : /* We don't fill fields we are not interested in. */
152 :
153 790 : res = lappend(res, sub);
154 790 : MemoryContextSwitchTo(oldcxt);
155 : }
156 :
157 4474 : table_endscan(scan);
158 4474 : table_close(rel, AccessShareLock);
159 :
160 4474 : CommitTransactionCommand();
161 :
162 4474 : return res;
163 : }
164 :
165 : /*
166 : * Wait for a background worker to start up and attach to the shmem context.
167 : *
168 : * This is only needed for cleaning up the shared memory in case the worker
169 : * fails to attach.
170 : *
171 : * Returns whether the attach was successful.
172 : */
173 : static bool
174 3382 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
175 : uint16 generation,
176 : BackgroundWorkerHandle *handle)
177 : {
178 : BgwHandleStatus status;
179 : int rc;
180 :
181 : for (;;)
182 2728 : {
183 : pid_t pid;
184 :
185 3382 : CHECK_FOR_INTERRUPTS();
186 :
187 3382 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
188 :
189 : /* Worker either died or has started. Return false if died. */
190 3382 : if (!worker->in_use || worker->proc)
191 : {
192 652 : LWLockRelease(LogicalRepWorkerLock);
193 652 : return worker->in_use;
194 : }
195 :
196 2730 : LWLockRelease(LogicalRepWorkerLock);
197 :
198 : /* Check if worker has died before attaching, and clean up after it. */
199 2730 : status = GetBackgroundWorkerPid(handle, &pid);
200 :
201 2730 : if (status == BGWH_STOPPED)
202 : {
203 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
204 : /* Ensure that this was indeed the worker we waited for. */
205 0 : if (generation == worker->generation)
206 0 : logicalrep_worker_cleanup(worker);
207 0 : LWLockRelease(LogicalRepWorkerLock);
208 0 : return false;
209 : }
210 :
211 : /*
212 : * We need timeout because we generally don't get notified via latch
213 : * about the worker attach. But we don't expect to have to wait long.
214 : */
215 2730 : rc = WaitLatch(MyLatch,
216 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
217 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
218 :
219 2730 : if (rc & WL_LATCH_SET)
220 : {
221 838 : ResetLatch(MyLatch);
222 838 : CHECK_FOR_INTERRUPTS();
223 : }
224 : }
225 : }
226 :
227 : /*
228 : * Walks the workers array and searches for one that matches given
229 : * subscription id and relid.
230 : *
231 : * We are only interested in the leader apply worker or table sync worker.
232 : */
233 : LogicalRepWorker *
234 4452 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
235 : {
236 : int i;
237 4452 : LogicalRepWorker *res = NULL;
238 :
239 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
240 :
241 : /* Search for attached worker for a given subscription id. */
242 13576 : for (i = 0; i < max_logical_replication_workers; i++)
243 : {
244 11892 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
245 :
246 : /* Skip parallel apply workers. */
247 11892 : if (isParallelApplyWorker(w))
248 0 : continue;
249 :
250 11892 : if (w->in_use && w->subid == subid && w->relid == relid &&
251 2768 : (!only_running || w->proc))
252 : {
253 2768 : res = w;
254 2768 : break;
255 : }
256 : }
257 :
258 4452 : return res;
259 : }
260 :
261 : /*
262 : * Similar to logicalrep_worker_find(), but returns a list of all workers for
263 : * the subscription, instead of just one.
264 : */
265 : List *
266 978 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
267 : {
268 : int i;
269 978 : List *res = NIL;
270 :
271 978 : if (acquire_lock)
272 200 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
273 :
274 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
275 :
276 : /* Search for attached worker for a given subscription id. */
277 5086 : for (i = 0; i < max_logical_replication_workers; i++)
278 : {
279 4108 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
280 :
281 4108 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
282 654 : res = lappend(res, w);
283 : }
284 :
285 978 : if (acquire_lock)
286 200 : LWLockRelease(LogicalRepWorkerLock);
287 :
288 978 : return res;
289 : }
290 :
291 : /*
292 : * Start new logical replication background worker, if possible.
293 : *
294 : * Returns true on success, false on failure.
295 : */
296 : bool
297 654 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
298 : Oid dbid, Oid subid, const char *subname, Oid userid,
299 : Oid relid, dsm_handle subworker_dsm)
300 : {
301 : BackgroundWorker bgw;
302 : BackgroundWorkerHandle *bgw_handle;
303 : uint16 generation;
304 : int i;
305 654 : int slot = 0;
306 654 : LogicalRepWorker *worker = NULL;
307 : int nsyncworkers;
308 : int nparallelapplyworkers;
309 : TimestampTz now;
310 654 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
311 654 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
312 :
313 : /*----------
314 : * Sanity checks:
315 : * - must be valid worker type
316 : * - tablesync workers are only ones to have relid
317 : * - parallel apply worker is the only kind of subworker
318 : */
319 : Assert(wtype != WORKERTYPE_UNKNOWN);
320 : Assert(is_tablesync_worker == OidIsValid(relid));
321 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
322 :
323 654 : ereport(DEBUG1,
324 : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
325 : subname)));
326 :
327 : /* Report this after the initial starting message for consistency. */
328 654 : if (max_replication_slots == 0)
329 0 : ereport(ERROR,
330 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
331 : errmsg("cannot start logical replication workers when \"max_replication_slots\"=0")));
332 :
333 : /*
334 : * We need to do the modification of the shared memory under lock so that
335 : * we have consistent view.
336 : */
337 654 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
338 :
339 654 : retry:
340 : /* Find unused worker slot. */
341 1184 : for (i = 0; i < max_logical_replication_workers; i++)
342 : {
343 1184 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
344 :
345 1184 : if (!w->in_use)
346 : {
347 654 : worker = w;
348 654 : slot = i;
349 654 : break;
350 : }
351 : }
352 :
353 654 : nsyncworkers = logicalrep_sync_worker_count(subid);
354 :
355 654 : now = GetCurrentTimestamp();
356 :
357 : /*
358 : * If we didn't find a free slot, try to do garbage collection. The
359 : * reason we do this is because if some worker failed to start up and its
360 : * parent has crashed while waiting, the in_use state was never cleared.
361 : */
362 654 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
363 : {
364 0 : bool did_cleanup = false;
365 :
366 0 : for (i = 0; i < max_logical_replication_workers; i++)
367 : {
368 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
369 :
370 : /*
371 : * If the worker was marked in use but didn't manage to attach in
372 : * time, clean it up.
373 : */
374 0 : if (w->in_use && !w->proc &&
375 0 : TimestampDifferenceExceeds(w->launch_time, now,
376 : wal_receiver_timeout))
377 : {
378 0 : elog(WARNING,
379 : "logical replication worker for subscription %u took too long to start; canceled",
380 : w->subid);
381 :
382 0 : logicalrep_worker_cleanup(w);
383 0 : did_cleanup = true;
384 : }
385 : }
386 :
387 0 : if (did_cleanup)
388 0 : goto retry;
389 : }
390 :
391 : /*
392 : * We don't allow to invoke more sync workers once we have reached the
393 : * sync worker limit per subscription. So, just return silently as we
394 : * might get here because of an otherwise harmless race condition.
395 : */
396 654 : if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
397 : {
398 0 : LWLockRelease(LogicalRepWorkerLock);
399 0 : return false;
400 : }
401 :
402 654 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
403 :
404 : /*
405 : * Return false if the number of parallel apply workers reached the limit
406 : * per subscription.
407 : */
408 654 : if (is_parallel_apply_worker &&
409 20 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
410 : {
411 0 : LWLockRelease(LogicalRepWorkerLock);
412 0 : return false;
413 : }
414 :
415 : /*
416 : * However if there are no more free worker slots, inform user about it
417 : * before exiting.
418 : */
419 654 : if (worker == NULL)
420 : {
421 0 : LWLockRelease(LogicalRepWorkerLock);
422 0 : ereport(WARNING,
423 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
424 : errmsg("out of logical replication worker slots"),
425 : errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
426 0 : return false;
427 : }
428 :
429 : /* Prepare the worker slot. */
430 654 : worker->type = wtype;
431 654 : worker->launch_time = now;
432 654 : worker->in_use = true;
433 654 : worker->generation++;
434 654 : worker->proc = NULL;
435 654 : worker->dbid = dbid;
436 654 : worker->userid = userid;
437 654 : worker->subid = subid;
438 654 : worker->relid = relid;
439 654 : worker->relstate = SUBREL_STATE_UNKNOWN;
440 654 : worker->relstate_lsn = InvalidXLogRecPtr;
441 654 : worker->stream_fileset = NULL;
442 654 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
443 654 : worker->parallel_apply = is_parallel_apply_worker;
444 654 : worker->last_lsn = InvalidXLogRecPtr;
445 654 : TIMESTAMP_NOBEGIN(worker->last_send_time);
446 654 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
447 654 : worker->reply_lsn = InvalidXLogRecPtr;
448 654 : TIMESTAMP_NOBEGIN(worker->reply_time);
449 :
450 : /* Before releasing lock, remember generation for future identification. */
451 654 : generation = worker->generation;
452 :
453 654 : LWLockRelease(LogicalRepWorkerLock);
454 :
455 : /* Register the new dynamic worker. */
456 654 : memset(&bgw, 0, sizeof(bgw));
457 654 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
458 : BGWORKER_BACKEND_DATABASE_CONNECTION;
459 654 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
460 654 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
461 :
462 654 : switch (worker->type)
463 : {
464 268 : case WORKERTYPE_APPLY:
465 268 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
466 268 : snprintf(bgw.bgw_name, BGW_MAXLEN,
467 : "logical replication apply worker for subscription %u",
468 : subid);
469 268 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
470 268 : break;
471 :
472 20 : case WORKERTYPE_PARALLEL_APPLY:
473 20 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
474 20 : snprintf(bgw.bgw_name, BGW_MAXLEN,
475 : "logical replication parallel apply worker for subscription %u",
476 : subid);
477 20 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
478 :
479 20 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
480 20 : break;
481 :
482 366 : case WORKERTYPE_TABLESYNC:
483 366 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
484 366 : snprintf(bgw.bgw_name, BGW_MAXLEN,
485 : "logical replication tablesync worker for subscription %u sync %u",
486 : subid,
487 : relid);
488 366 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
489 366 : break;
490 :
491 0 : case WORKERTYPE_UNKNOWN:
492 : /* Should never happen. */
493 0 : elog(ERROR, "unknown worker type");
494 : }
495 :
496 654 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
497 654 : bgw.bgw_notify_pid = MyProcPid;
498 654 : bgw.bgw_main_arg = Int32GetDatum(slot);
499 :
500 654 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
501 : {
502 : /* Failed to start worker, so clean up the worker slot. */
503 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
504 : Assert(generation == worker->generation);
505 0 : logicalrep_worker_cleanup(worker);
506 0 : LWLockRelease(LogicalRepWorkerLock);
507 :
508 0 : ereport(WARNING,
509 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
510 : errmsg("out of background worker slots"),
511 : errhint("You might need to increase \"%s\".", "max_worker_processes")));
512 0 : return false;
513 : }
514 :
515 : /* Now wait until it attaches. */
516 654 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
517 : }
518 :
519 : /*
520 : * Internal function to stop the worker and wait until it detaches from the
521 : * slot.
522 : */
523 : static void
524 144 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
525 : {
526 : uint16 generation;
527 :
528 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
529 :
530 : /*
531 : * Remember which generation was our worker so we can check if what we see
532 : * is still the same one.
533 : */
534 144 : generation = worker->generation;
535 :
536 : /*
537 : * If we found a worker but it does not have proc set then it is still
538 : * starting up; wait for it to finish starting and then kill it.
539 : */
540 158 : while (worker->in_use && !worker->proc)
541 : {
542 : int rc;
543 :
544 20 : LWLockRelease(LogicalRepWorkerLock);
545 :
546 : /* Wait a bit --- we don't expect to have to wait long. */
547 20 : rc = WaitLatch(MyLatch,
548 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
549 : 10L, WAIT_EVENT_BGWORKER_STARTUP);
550 :
551 20 : if (rc & WL_LATCH_SET)
552 : {
553 0 : ResetLatch(MyLatch);
554 0 : CHECK_FOR_INTERRUPTS();
555 : }
556 :
557 : /* Recheck worker status. */
558 20 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
559 :
560 : /*
561 : * Check whether the worker slot is no longer used, which would mean
562 : * that the worker has exited, or whether the worker generation is
563 : * different, meaning that a different worker has taken the slot.
564 : */
565 20 : if (!worker->in_use || worker->generation != generation)
566 0 : return;
567 :
568 : /* Worker has assigned proc, so it has started. */
569 20 : if (worker->proc)
570 6 : break;
571 : }
572 :
573 : /* Now terminate the worker ... */
574 144 : kill(worker->proc->pid, signo);
575 :
576 : /* ... and wait for it to die. */
577 : for (;;)
578 180 : {
579 : int rc;
580 :
581 : /* is it gone? */
582 324 : if (!worker->proc || worker->generation != generation)
583 : break;
584 :
585 180 : LWLockRelease(LogicalRepWorkerLock);
586 :
587 : /* Wait a bit --- we don't expect to have to wait long. */
588 180 : rc = WaitLatch(MyLatch,
589 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
590 : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
591 :
592 180 : if (rc & WL_LATCH_SET)
593 : {
594 34 : ResetLatch(MyLatch);
595 34 : CHECK_FOR_INTERRUPTS();
596 : }
597 :
598 180 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
599 : }
600 : }
601 :
602 : /*
603 : * Stop the logical replication worker for subid/relid, if any.
604 : */
605 : void
606 164 : logicalrep_worker_stop(Oid subid, Oid relid)
607 : {
608 : LogicalRepWorker *worker;
609 :
610 164 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
611 :
612 164 : worker = logicalrep_worker_find(subid, relid, false);
613 :
614 164 : if (worker)
615 : {
616 : Assert(!isParallelApplyWorker(worker));
617 128 : logicalrep_worker_stop_internal(worker, SIGTERM);
618 : }
619 :
620 164 : LWLockRelease(LogicalRepWorkerLock);
621 164 : }
622 :
623 : /*
624 : * Stop the given logical replication parallel apply worker.
625 : *
626 : * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
627 : * worker so that the worker exits cleanly.
628 : */
629 : void
630 10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
631 : {
632 : int slot_no;
633 : uint16 generation;
634 : LogicalRepWorker *worker;
635 :
636 10 : SpinLockAcquire(&winfo->shared->mutex);
637 10 : generation = winfo->shared->logicalrep_worker_generation;
638 10 : slot_no = winfo->shared->logicalrep_worker_slot_no;
639 10 : SpinLockRelease(&winfo->shared->mutex);
640 :
641 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
642 :
643 : /*
644 : * Detach from the error_mq_handle for the parallel apply worker before
645 : * stopping it. This prevents the leader apply worker from trying to
646 : * receive the message from the error queue that might already be detached
647 : * by the parallel apply worker.
648 : */
649 10 : if (winfo->error_mq_handle)
650 : {
651 10 : shm_mq_detach(winfo->error_mq_handle);
652 10 : winfo->error_mq_handle = NULL;
653 : }
654 :
655 10 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
656 :
657 10 : worker = &LogicalRepCtx->workers[slot_no];
658 : Assert(isParallelApplyWorker(worker));
659 :
660 : /*
661 : * Only stop the worker if the generation matches and the worker is alive.
662 : */
663 10 : if (worker->generation == generation && worker->proc)
664 10 : logicalrep_worker_stop_internal(worker, SIGINT);
665 :
666 10 : LWLockRelease(LogicalRepWorkerLock);
667 10 : }
668 :
669 : /*
670 : * Wake up (using latch) any logical replication worker for specified sub/rel.
671 : */
672 : void
673 402 : logicalrep_worker_wakeup(Oid subid, Oid relid)
674 : {
675 : LogicalRepWorker *worker;
676 :
677 402 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
678 :
679 402 : worker = logicalrep_worker_find(subid, relid, true);
680 :
681 402 : if (worker)
682 402 : logicalrep_worker_wakeup_ptr(worker);
683 :
684 402 : LWLockRelease(LogicalRepWorkerLock);
685 402 : }
686 :
687 : /*
688 : * Wake up (using latch) the specified logical replication worker.
689 : *
690 : * Caller must hold lock, else worker->proc could change under us.
691 : */
692 : void
693 1196 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
694 : {
695 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
696 :
697 1196 : SetLatch(&worker->proc->procLatch);
698 1196 : }
699 :
700 : /*
701 : * Attach to a slot.
702 : */
703 : void
704 806 : logicalrep_worker_attach(int slot)
705 : {
706 : /* Block concurrent access. */
707 806 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
708 :
709 : Assert(slot >= 0 && slot < max_logical_replication_workers);
710 806 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
711 :
712 806 : if (!MyLogicalRepWorker->in_use)
713 : {
714 0 : LWLockRelease(LogicalRepWorkerLock);
715 0 : ereport(ERROR,
716 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
717 : errmsg("logical replication worker slot %d is empty, cannot attach",
718 : slot)));
719 : }
720 :
721 806 : if (MyLogicalRepWorker->proc)
722 : {
723 0 : LWLockRelease(LogicalRepWorkerLock);
724 0 : ereport(ERROR,
725 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
726 : errmsg("logical replication worker slot %d is already used by "
727 : "another worker, cannot attach", slot)));
728 : }
729 :
730 806 : MyLogicalRepWorker->proc = MyProc;
731 806 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
732 :
733 806 : LWLockRelease(LogicalRepWorkerLock);
734 806 : }
735 :
736 : /*
737 : * Stop the parallel apply workers if any, and detach the leader apply worker
738 : * (cleans up the worker info).
739 : */
740 : static void
741 806 : logicalrep_worker_detach(void)
742 : {
743 : /* Stop the parallel apply workers. */
744 806 : if (am_leader_apply_worker())
745 : {
746 : List *workers;
747 : ListCell *lc;
748 :
749 : /*
750 : * Detach from the error_mq_handle for all parallel apply workers
751 : * before terminating them. This prevents the leader apply worker from
752 : * receiving the worker termination message and sending it to logs
753 : * when the same is already done by the parallel worker.
754 : */
755 416 : pa_detach_all_error_mq();
756 :
757 416 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
758 :
759 416 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
760 842 : foreach(lc, workers)
761 : {
762 426 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
763 :
764 426 : if (isParallelApplyWorker(w))
765 6 : logicalrep_worker_stop_internal(w, SIGTERM);
766 : }
767 :
768 416 : LWLockRelease(LogicalRepWorkerLock);
769 : }
770 :
771 : /* Block concurrent access. */
772 806 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
773 :
774 806 : logicalrep_worker_cleanup(MyLogicalRepWorker);
775 :
776 806 : LWLockRelease(LogicalRepWorkerLock);
777 806 : }
778 :
779 : /*
780 : * Clean up worker info.
781 : */
782 : static void
783 806 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
784 : {
785 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
786 :
787 806 : worker->type = WORKERTYPE_UNKNOWN;
788 806 : worker->in_use = false;
789 806 : worker->proc = NULL;
790 806 : worker->dbid = InvalidOid;
791 806 : worker->userid = InvalidOid;
792 806 : worker->subid = InvalidOid;
793 806 : worker->relid = InvalidOid;
794 806 : worker->leader_pid = InvalidPid;
795 806 : worker->parallel_apply = false;
796 806 : }
797 :
798 : /*
799 : * Cleanup function for logical replication launcher.
800 : *
801 : * Called on logical replication launcher exit.
802 : */
803 : static void
804 746 : logicalrep_launcher_onexit(int code, Datum arg)
805 : {
806 746 : LogicalRepCtx->launcher_pid = 0;
807 746 : }
808 :
809 : /*
810 : * Cleanup function.
811 : *
812 : * Called on logical replication worker exit.
813 : */
814 : static void
815 806 : logicalrep_worker_onexit(int code, Datum arg)
816 : {
817 : /* Disconnect gracefully from the remote side. */
818 806 : if (LogRepWorkerWalRcvConn)
819 726 : walrcv_disconnect(LogRepWorkerWalRcvConn);
820 :
821 806 : logicalrep_worker_detach();
822 :
823 : /* Cleanup fileset used for streaming transactions. */
824 806 : if (MyLogicalRepWorker->stream_fileset != NULL)
825 28 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
826 :
827 : /*
828 : * Session level locks may be acquired outside of a transaction in
829 : * parallel apply mode and will not be released when the worker
830 : * terminates, so manually release all locks before the worker exits.
831 : *
832 : * The locks will be acquired once the worker is initialized.
833 : */
834 806 : if (!InitializingApplyWorker)
835 798 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
836 :
837 806 : ApplyLauncherWakeup();
838 806 : }
839 :
840 : /*
841 : * Count the number of registered (not necessarily running) sync workers
842 : * for a subscription.
843 : */
844 : int
845 1992 : logicalrep_sync_worker_count(Oid subid)
846 : {
847 : int i;
848 1992 : int res = 0;
849 :
850 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
851 :
852 : /* Search for attached worker for a given subscription id. */
853 10296 : for (i = 0; i < max_logical_replication_workers; i++)
854 : {
855 8304 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
856 :
857 8304 : if (isTablesyncWorker(w) && w->subid == subid)
858 2172 : res++;
859 : }
860 :
861 1992 : return res;
862 : }
863 :
864 : /*
865 : * Count the number of registered (but not necessarily running) parallel apply
866 : * workers for a subscription.
867 : */
868 : static int
869 654 : logicalrep_pa_worker_count(Oid subid)
870 : {
871 : int i;
872 654 : int res = 0;
873 :
874 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
875 :
876 : /*
877 : * Scan all attached parallel apply workers, only counting those which
878 : * have the given subscription id.
879 : */
880 3458 : for (i = 0; i < max_logical_replication_workers; i++)
881 : {
882 2804 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
883 :
884 2804 : if (isParallelApplyWorker(w) && w->subid == subid)
885 4 : res++;
886 : }
887 :
888 654 : return res;
889 : }
890 :
891 : /*
892 : * ApplyLauncherShmemSize
893 : * Compute space needed for replication launcher shared memory
894 : */
895 : Size
896 7402 : ApplyLauncherShmemSize(void)
897 : {
898 : Size size;
899 :
900 : /*
901 : * Need the fixed struct and the array of LogicalRepWorker.
902 : */
903 7402 : size = sizeof(LogicalRepCtxStruct);
904 7402 : size = MAXALIGN(size);
905 7402 : size = add_size(size, mul_size(max_logical_replication_workers,
906 : sizeof(LogicalRepWorker)));
907 7402 : return size;
908 : }
909 :
910 : /*
911 : * ApplyLauncherRegister
912 : * Register a background worker running the logical replication launcher.
913 : */
914 : void
915 1544 : ApplyLauncherRegister(void)
916 : {
917 : BackgroundWorker bgw;
918 :
919 : /*
920 : * The logical replication launcher is disabled during binary upgrades, to
921 : * prevent logical replication workers from running on the source cluster.
922 : * That could cause replication origins to move forward after having been
923 : * copied to the target cluster, potentially creating conflicts with the
924 : * copied data files.
925 : */
926 1544 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
927 54 : return;
928 :
929 1490 : memset(&bgw, 0, sizeof(bgw));
930 1490 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
931 : BGWORKER_BACKEND_DATABASE_CONNECTION;
932 1490 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
933 1490 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
934 1490 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
935 1490 : snprintf(bgw.bgw_name, BGW_MAXLEN,
936 : "logical replication launcher");
937 1490 : snprintf(bgw.bgw_type, BGW_MAXLEN,
938 : "logical replication launcher");
939 1490 : bgw.bgw_restart_time = 5;
940 1490 : bgw.bgw_notify_pid = 0;
941 1490 : bgw.bgw_main_arg = (Datum) 0;
942 :
943 1490 : RegisterBackgroundWorker(&bgw);
944 : }
945 :
946 : /*
947 : * ApplyLauncherShmemInit
948 : * Allocate and initialize replication launcher shared memory
949 : */
950 : void
951 1918 : ApplyLauncherShmemInit(void)
952 : {
953 : bool found;
954 :
955 1918 : LogicalRepCtx = (LogicalRepCtxStruct *)
956 1918 : ShmemInitStruct("Logical Replication Launcher Data",
957 : ApplyLauncherShmemSize(),
958 : &found);
959 :
960 1918 : if (!found)
961 : {
962 : int slot;
963 :
964 1918 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
965 :
966 1918 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
967 1918 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
968 :
969 : /* Initialize memory and spin locks for each worker slot. */
970 9532 : for (slot = 0; slot < max_logical_replication_workers; slot++)
971 : {
972 7614 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
973 :
974 7614 : memset(worker, 0, sizeof(LogicalRepWorker));
975 7614 : SpinLockInit(&worker->relmutex);
976 : }
977 : }
978 1918 : }
979 :
980 : /*
981 : * Initialize or attach to the dynamic shared hash table that stores the
982 : * last-start times, if not already done.
983 : * This must be called before accessing the table.
984 : */
985 : static void
986 860 : logicalrep_launcher_attach_dshmem(void)
987 : {
988 : MemoryContext oldcontext;
989 :
990 : /* Quick exit if we already did this. */
991 860 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
992 762 : last_start_times != NULL)
993 540 : return;
994 :
995 : /* Otherwise, use a lock to ensure only one process creates the table. */
996 320 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
997 :
998 : /* Be sure any local memory allocated by DSA routines is persistent. */
999 320 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1000 :
1001 320 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1002 : {
1003 : /* Initialize dynamic shared hash table for last-start times. */
1004 98 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1005 98 : dsa_pin(last_start_times_dsa);
1006 98 : dsa_pin_mapping(last_start_times_dsa);
1007 98 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1008 :
1009 : /* Store handles in shared memory for other backends to use. */
1010 98 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1011 98 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1012 : }
1013 222 : else if (!last_start_times)
1014 : {
1015 : /* Attach to existing dynamic shared hash table. */
1016 222 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1017 222 : dsa_pin_mapping(last_start_times_dsa);
1018 222 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
1019 222 : LogicalRepCtx->last_start_dsh, 0);
1020 : }
1021 :
1022 320 : MemoryContextSwitchTo(oldcontext);
1023 320 : LWLockRelease(LogicalRepWorkerLock);
1024 : }
1025 :
1026 : /*
1027 : * Set the last-start time for the subscription.
1028 : */
1029 : static void
1030 268 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1031 : {
1032 : LauncherLastStartTimesEntry *entry;
1033 : bool found;
1034 :
1035 268 : logicalrep_launcher_attach_dshmem();
1036 :
1037 268 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1038 268 : entry->last_start_time = start_time;
1039 268 : dshash_release_lock(last_start_times, entry);
1040 268 : }
1041 :
1042 : /*
1043 : * Return the last-start time for the subscription, or 0 if there isn't one.
1044 : */
1045 : static TimestampTz
1046 310 : ApplyLauncherGetWorkerStartTime(Oid subid)
1047 : {
1048 : LauncherLastStartTimesEntry *entry;
1049 : TimestampTz ret;
1050 :
1051 310 : logicalrep_launcher_attach_dshmem();
1052 :
1053 310 : entry = dshash_find(last_start_times, &subid, false);
1054 310 : if (entry == NULL)
1055 230 : return 0;
1056 :
1057 80 : ret = entry->last_start_time;
1058 80 : dshash_release_lock(last_start_times, entry);
1059 :
1060 80 : return ret;
1061 : }
1062 :
1063 : /*
1064 : * Remove the last-start-time entry for the subscription, if one exists.
1065 : *
1066 : * This has two use-cases: to remove the entry related to a subscription
1067 : * that's been deleted or disabled (just to avoid leaking shared memory),
1068 : * and to allow immediate restart of an apply worker that has exited
1069 : * due to subscription parameter changes.
1070 : */
1071 : void
1072 282 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1073 : {
1074 282 : logicalrep_launcher_attach_dshmem();
1075 :
1076 282 : (void) dshash_delete_key(last_start_times, &subid);
1077 282 : }
1078 :
1079 : /*
1080 : * Wakeup the launcher on commit if requested.
1081 : */
1082 : void
1083 786030 : AtEOXact_ApplyLauncher(bool isCommit)
1084 : {
1085 786030 : if (isCommit)
1086 : {
1087 738162 : if (on_commit_launcher_wakeup)
1088 254 : ApplyLauncherWakeup();
1089 : }
1090 :
1091 786030 : on_commit_launcher_wakeup = false;
1092 786030 : }
1093 :
1094 : /*
1095 : * Request wakeup of the launcher on commit of the transaction.
1096 : *
1097 : * This is used to send launcher signal to stop sleeping and process the
1098 : * subscriptions when current transaction commits. Should be used when new
1099 : * tuple was added to the pg_subscription catalog.
1100 : */
1101 : void
1102 254 : ApplyLauncherWakeupAtCommit(void)
1103 : {
1104 254 : if (!on_commit_launcher_wakeup)
1105 254 : on_commit_launcher_wakeup = true;
1106 254 : }
1107 :
1108 : static void
1109 1060 : ApplyLauncherWakeup(void)
1110 : {
1111 1060 : if (LogicalRepCtx->launcher_pid != 0)
1112 1002 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1113 1060 : }
1114 :
1115 : /*
1116 : * Main loop for the apply launcher process.
1117 : */
1118 : void
1119 746 : ApplyLauncherMain(Datum main_arg)
1120 : {
1121 746 : ereport(DEBUG1,
1122 : (errmsg_internal("logical replication launcher started")));
1123 :
1124 746 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1125 :
1126 : Assert(LogicalRepCtx->launcher_pid == 0);
1127 746 : LogicalRepCtx->launcher_pid = MyProcPid;
1128 :
1129 : /* Establish signal handlers. */
1130 746 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
1131 746 : pqsignal(SIGTERM, die);
1132 746 : BackgroundWorkerUnblockSignals();
1133 :
1134 : /*
1135 : * Establish connection to nailed catalogs (we only ever access
1136 : * pg_subscription).
1137 : */
1138 746 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1139 :
1140 : /* Enter main loop */
1141 : for (;;)
1142 3760 : {
1143 : int rc;
1144 : List *sublist;
1145 : ListCell *lc;
1146 : MemoryContext subctx;
1147 : MemoryContext oldctx;
1148 4506 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1149 :
1150 4506 : CHECK_FOR_INTERRUPTS();
1151 :
1152 : /* Use temporary context to avoid leaking memory across cycles. */
1153 4474 : subctx = AllocSetContextCreate(TopMemoryContext,
1154 : "Logical Replication Launcher sublist",
1155 : ALLOCSET_DEFAULT_SIZES);
1156 4474 : oldctx = MemoryContextSwitchTo(subctx);
1157 :
1158 : /* Start any missing workers for enabled subscriptions. */
1159 4474 : sublist = get_subscription_list();
1160 5262 : foreach(lc, sublist)
1161 : {
1162 790 : Subscription *sub = (Subscription *) lfirst(lc);
1163 : LogicalRepWorker *w;
1164 : TimestampTz last_start;
1165 : TimestampTz now;
1166 : long elapsed;
1167 :
1168 790 : if (!sub->enabled)
1169 68 : continue;
1170 :
1171 722 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1172 722 : w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1173 722 : LWLockRelease(LogicalRepWorkerLock);
1174 :
1175 722 : if (w != NULL)
1176 412 : continue; /* worker is running already */
1177 :
1178 : /*
1179 : * If the worker is eligible to start now, launch it. Otherwise,
1180 : * adjust wait_time so that we'll wake up as soon as it can be
1181 : * started.
1182 : *
1183 : * Each subscription's apply worker can only be restarted once per
1184 : * wal_retrieve_retry_interval, so that errors do not cause us to
1185 : * repeatedly restart the worker as fast as possible. In cases
1186 : * where a restart is expected (e.g., subscription parameter
1187 : * changes), another process should remove the last-start entry
1188 : * for the subscription so that the worker can be restarted
1189 : * without waiting for wal_retrieve_retry_interval to elapse.
1190 : */
1191 310 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1192 310 : now = GetCurrentTimestamp();
1193 310 : if (last_start == 0 ||
1194 80 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1195 : {
1196 268 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
1197 268 : logicalrep_worker_launch(WORKERTYPE_APPLY,
1198 268 : sub->dbid, sub->oid, sub->name,
1199 : sub->owner, InvalidOid,
1200 : DSM_HANDLE_INVALID);
1201 : }
1202 : else
1203 : {
1204 42 : wait_time = Min(wait_time,
1205 : wal_retrieve_retry_interval - elapsed);
1206 : }
1207 : }
1208 :
1209 : /* Switch back to original memory context. */
1210 4472 : MemoryContextSwitchTo(oldctx);
1211 : /* Clean the temporary memory. */
1212 4472 : MemoryContextDelete(subctx);
1213 :
1214 : /* Wait for more work. */
1215 4472 : rc = WaitLatch(MyLatch,
1216 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1217 : wait_time,
1218 : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1219 :
1220 4466 : if (rc & WL_LATCH_SET)
1221 : {
1222 4452 : ResetLatch(MyLatch);
1223 4452 : CHECK_FOR_INTERRUPTS();
1224 : }
1225 :
1226 3760 : if (ConfigReloadPending)
1227 : {
1228 58 : ConfigReloadPending = false;
1229 58 : ProcessConfigFile(PGC_SIGHUP);
1230 : }
1231 : }
1232 :
1233 : /* Not reachable */
1234 : }
1235 :
1236 : /*
1237 : * Is current process the logical replication launcher?
1238 : */
1239 : bool
1240 782 : IsLogicalLauncher(void)
1241 : {
1242 782 : return LogicalRepCtx->launcher_pid == MyProcPid;
1243 : }
1244 :
1245 : /*
1246 : * Return the pid of the leader apply worker if the given pid is the pid of a
1247 : * parallel apply worker, otherwise, return InvalidPid.
1248 : */
1249 : pid_t
1250 1520 : GetLeaderApplyWorkerPid(pid_t pid)
1251 : {
1252 1520 : int leader_pid = InvalidPid;
1253 : int i;
1254 :
1255 1520 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1256 :
1257 7600 : for (i = 0; i < max_logical_replication_workers; i++)
1258 : {
1259 6080 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1260 :
1261 6080 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1262 : {
1263 0 : leader_pid = w->leader_pid;
1264 0 : break;
1265 : }
1266 : }
1267 :
1268 1520 : LWLockRelease(LogicalRepWorkerLock);
1269 :
1270 1520 : return leader_pid;
1271 : }
1272 :
1273 : /*
1274 : * Returns state of the subscriptions.
1275 : */
1276 : Datum
1277 2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1278 : {
1279 : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1280 2 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1281 : int i;
1282 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1283 :
1284 2 : InitMaterializedSRF(fcinfo, 0);
1285 :
1286 : /* Make sure we get consistent view of the workers. */
1287 2 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1288 :
1289 10 : for (i = 0; i < max_logical_replication_workers; i++)
1290 : {
1291 : /* for each row */
1292 8 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1293 8 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1294 : int worker_pid;
1295 : LogicalRepWorker worker;
1296 :
1297 8 : memcpy(&worker, &LogicalRepCtx->workers[i],
1298 : sizeof(LogicalRepWorker));
1299 8 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1300 4 : continue;
1301 :
1302 4 : if (OidIsValid(subid) && worker.subid != subid)
1303 0 : continue;
1304 :
1305 4 : worker_pid = worker.proc->pid;
1306 :
1307 4 : values[0] = ObjectIdGetDatum(worker.subid);
1308 4 : if (isTablesyncWorker(&worker))
1309 0 : values[1] = ObjectIdGetDatum(worker.relid);
1310 : else
1311 4 : nulls[1] = true;
1312 4 : values[2] = Int32GetDatum(worker_pid);
1313 :
1314 4 : if (isParallelApplyWorker(&worker))
1315 0 : values[3] = Int32GetDatum(worker.leader_pid);
1316 : else
1317 4 : nulls[3] = true;
1318 :
1319 4 : if (XLogRecPtrIsInvalid(worker.last_lsn))
1320 2 : nulls[4] = true;
1321 : else
1322 2 : values[4] = LSNGetDatum(worker.last_lsn);
1323 4 : if (worker.last_send_time == 0)
1324 0 : nulls[5] = true;
1325 : else
1326 4 : values[5] = TimestampTzGetDatum(worker.last_send_time);
1327 4 : if (worker.last_recv_time == 0)
1328 0 : nulls[6] = true;
1329 : else
1330 4 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
1331 4 : if (XLogRecPtrIsInvalid(worker.reply_lsn))
1332 2 : nulls[7] = true;
1333 : else
1334 2 : values[7] = LSNGetDatum(worker.reply_lsn);
1335 4 : if (worker.reply_time == 0)
1336 0 : nulls[8] = true;
1337 : else
1338 4 : values[8] = TimestampTzGetDatum(worker.reply_time);
1339 :
1340 4 : switch (worker.type)
1341 : {
1342 4 : case WORKERTYPE_APPLY:
1343 4 : values[9] = CStringGetTextDatum("apply");
1344 4 : break;
1345 0 : case WORKERTYPE_PARALLEL_APPLY:
1346 0 : values[9] = CStringGetTextDatum("parallel apply");
1347 0 : break;
1348 0 : case WORKERTYPE_TABLESYNC:
1349 0 : values[9] = CStringGetTextDatum("table synchronization");
1350 0 : break;
1351 0 : case WORKERTYPE_UNKNOWN:
1352 : /* Should never happen. */
1353 0 : elog(ERROR, "unknown worker type");
1354 : }
1355 :
1356 4 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1357 : values, nulls);
1358 :
1359 : /*
1360 : * If only a single subscription was requested, and we found it,
1361 : * break.
1362 : */
1363 4 : if (OidIsValid(subid))
1364 0 : break;
1365 : }
1366 :
1367 2 : LWLockRelease(LogicalRepWorkerLock);
1368 :
1369 2 : return (Datum) 0;
1370 : }
|